![]()
【USparkle專欄】如果你深懷絕技,愛“搞點研究”,樂于分享也博采眾長,我們期待你的加入,讓智慧的火花碰撞交織,讓知識的傳遞生生不息!
這是侑虎科技第1917篇文章,感謝作者南京周潤發供稿。歡迎轉發分享,未經作者授權請勿轉載。如果您有任何獨到的見解或者發現也歡迎聯系我們,一起探討。(QQ群:793972859)
作者主頁:
https://www.zhihu.com/people/xu-chen-71-65
當有持續時間短,又比較雜的異步任務時,可以使用ThreadPool,用固定數量的工作線程執行任務,不每次都創建新線程。UE4和UE5的線程池有很大區別,UE4線程池會真的創建很多線程,而UE5主要線程池底層復用了TaskGraph的線程,線程池只是邏輯上的概念。
一、創建線程池
線程池在FEngineLoop::PreInitPreStartupScreen函數中創建。
GThreadPool
類型為FQueuedLowLevelThreadPool,是UE5中的新實現,線程數量由FPlatformMisc::NumberOfWorkerThreadsToSpawn()確定。
GIOThreadPool
類型為FQueuedThreadPool,線程數量由FPlatformMisc::NumberOfIOWorkerThreadsToSpawn()確定,Client為4,Server為2。
GBackgroundPriorityThreadPool
類型為FQueuedThreadPool,Client為2,Server為1。
GLargeThreadPool
類型為FQueuedLowLevelThreadPool,數量由FPlatformMisc::NumberOfCoresIncludingHyperthreads()確定。
二、使用線程池
雖然線程池實現比Runnable復雜,但使用方式也比較簡單。
1. Async函數
最常見用法,Async函數可設置EAsyncExecution::ThreadPool參數,指定任務在ThreadPool里執行。
![]()
函數內部會創建TAsyncQueuedWork封裝Function和Promise,然后使用AddQueuedWork接口把任務加到GThreadPool中。
AddQueuedWork是線程池最重要的接口。
![]()
2. AsyncPool函數
與Async類似,但可以指定線程池和Work優先級。
![]()
3.手動調用AddQueuedWork
AddQueuedWork函數只需要接受IQueuedWork作為參數,TAsyncQueuedWork只是一個子類,我們可以創建子類,做自定義操作,這樣也能指定使用哪個線程池。
比如引擎中Encode LightMap的操作,就使用了FAsyncEncode類:
![]()
三、線程池實現
1.類型定義
類型定義可分為線程池,線程池線程,任務。
1. 線程池
FQueuedThreadPool:線程池基類,定義了線程池的接口。
Allocate:創建線程池,類型為FQueuedThreadPoolBase。
Create:創建若干工作線程。
AddQueuedWork:向線程池添加任務。
RetractQueuedWork:撤回任務。
AddQueuedWork和RetractQueuedWork是線程池提供給外部調用的主要接口,注意會在多線程中被調用。
FQueuedThreadPool有多種實現:
FQueuedThreadPoolBase
最常用,線程池的基礎實現,GIOThreadPool和GBackgroundPriorityThreadPool都會使用。
成員:
FThreadPoolPriorityQueue QueuedWork:待處理任務的隊列。
TArray QueuedThreads:等待接收任務的空閑線程。
TArray AllThreads:所有工作線程。
FCriticalSection* SynchQueue:保護任務隊列的CriticalSection,因為任務隊列會被多線程修改。
FQueuedLowLevelThreadPool
底層線程使用TaskGraph的ThreadPool,UE5中GThreadPool的默認實現。
FQueuedThreadPoolWrapper
FQueuedThreadPoolDynamicWrapper
FQueuedThreadPoolTaskGraphWrapper
2. 線程池線程
FQueuedThread:繼承自FRunnable,表示線程池中的工作線程。可以想象,它大部分時間都處于idle狀態,當有任務來時才工作。
成員:
DoWorkEvent:通知線程有任務要執行的Event。
QueuedWork:當前線程正在執行的Work。
Thread:Runnable對應的線程。
函數:
Run:主函數,可認為是一個等待、執行任務的循環。
DoWork:由ThreadPool調用,傳入一個任務并執行。
3. 任務
IQueuedWork:可排隊任務的基類接口,供線程池使用。
接口:
DoThreadedWork:執行任務。
IQueuedWork有多種實現:
TAsyncQueuedWork
最常用,Async和AsyncPool函數中使用。
DoThreadedWork:通過SetPromise執行任務。
FAsyncTaskBase
可操作內容更多。
DoThreadedWork:通過Task執行任務。
類圖如下:
![]()
常用部分已高亮顯示
2. FQueuedThreadPoolBase
線程池創建
FQueuedThreadPoolBase是默認線程池,FQueuedThreadPool::Allocate函數中構造。
![]()
線程池通過Create函數初始化,主要工作是創建InNumQueuedThreads數量的工作線程,使用FQueuedThread類封裝,并把創建的線程加入QueuedThreads和AllThreads容器中,QueuedThreads中存儲了當前線程池中處于空閑狀態的線程。還要創建CriticalSection對象SynchQueue,用于保護對QueuedWork和QueuedThreads的訪問。
![]()
FQueuedThread
FQueuedThread繼承自FRunnable,是一個可運行任務的抽象,其Create函數如下。首先創建DoWorkEvent,用于做多線程同步,然后創建一個底層的Thread。線程創建好后進入Run方法,初始沒有任務,線程在DoWorkEvent上等待,處于休眠狀態。
![]()
添加任務
觀察AddQueuedWork函數,添加任務時分成了兩種情況。
如果線程池中尚有空閑線程,即下圖中的情況1,QueuedThreads中有元素,那么把任務分配給其中一個線程即可,這里還有一個細節,QueuedThreads采用棧管理,先進后出,這可以更好利用CPU Cache,因為這個Thread可能剛運行過,同時也可以避免數組中的元素移動。得到Thread后,調用DoWork方法添加任務。
另一種情況是所有線程都在忙碌,QueuedThreads中沒有元素,這時只能把InQueuedWork暫存到QueuedWork中,等線程執行完之前任務后再做處理。
![]()
FQueuedThread::DoWork方法用于通知一個Thread要執行任務了,首先把InQueuedWork設置到其QueuedWork屬性上,然后執行DoWorkEvent的Trigger方法,喚醒該Thread。注意這里加了一個MemoryBarrier,是為了避免CPU指令亂序優化導致1071行在1074行之后執行,導致錯誤。
![]()
執行任務
執行任務通過屬性的Run函數實現。Thread一開始會在DoWorkEvent上等待,被DoWork函數喚醒后,會獲取之前被賦值的QueuedWork,執行DoThreadedWork函數,這里是真正執行任務。執行完成后再調用ThreadPool的ReturnToPoolOrGetNextJob函數,嘗試獲取暫存的QueuedWork并執行,若沒有就把Thread歸還到QueuedThreads中,之后在DoWorkEvent上等待,進入休眠狀態。
![]()
![]()
流程圖示:
![]()
3. TAsyncQueuedWork
線程池中的任務,包裝了一個Function對象,DoThreadWork函數中使用給Promise SetValue的形式來執行Function。
![]()
以上就是UE線程池常用的FQueuedThreadPoolBase,FQueuedThread,TAsyncQueuedWork組合。
以下內容是UE5的改動。
4. FQueuedLowLevelThreadPool
在UE5中,非Editor模式下GThreadPool實現變成了FQueuedLowLevelThreadPool。底層使用了TaskGraph,相關內容放在后面看,這里只分析與線程池相關的部分。
UE希望把多線程操作盡量放在TaskGraph里,這樣好管理。CPU物理核心數量是有限的,如果TaskGraph和ThreadPool都創建了核心數量的線程,其實在各自管理,兩邊線程都跑滿就會產生更多的CPU調度開銷。
Create
其實不需要Create了,因為自己不創建線程,初始化在構造函數里完成,主要任務是獲取LowLevelTasks::FScheduler單例。
![]()
FQueuedThreadPool::Create只是實現一下純虛函數。
![]()
LowLevelTasks::Fscheduler管理了TaskGraph中的Workers線程,包括ForegroundWorkers和BackgroundWorkers,向Worker線程分發任務,細節后面再看。
5. AddQueuedWork
![]()
首先創建FQueuedWorkInternalData對象來存儲QueuedWork相關數據,然后設置到InQueuedWork.InternalData屬性。
FQueuedWorkInternalData類包裝了一個LowLevelTasks::FTask,FTask用于把QueuedWork包裝成TaskGraph里可執行的東西。Retract函數用于取消任務,但線程池場景下不需要考慮取消。
![]()
Task.Init函數調用有點繞,464行先把InQueuedWork包裝成一個Lambda函數,然后在Init實現里面再把Lambda包裝到另一個TFunction里面。這樣就把InQueuedWork存到Task里面了,往后操作只和TaskGraph有關,與線程池無關了。
![]()
FScheduler::TryLaunch把Task添加到任務隊列中,等待Worker線程來消費。
![]()
6. 執行任務
TaskGraph中Worker線程的Run函數會循環獲取任務執行,細節放后面TashGraph里看,這里只看一個調用棧。
下圖中1的位置是Worker線程取Task,2的位置是執行InQueuedWork->DoThreadedWork(),終于又回到了線程池。
![]()
總體來看,FQueuedLowLevelThreadPool其實就是TaskGraph,和Async函數中傳EAsyncExecution::TaskGraph是一個效果。
7. FQueuedThreadPoolWrapper
不是真正的線程池,而是另一個線程池的包裝,任務都會轉發過去。UE5 Editor下GThreadPool就會設置成這個,包裝了GLargeThreadPool,目的為共用GLargeThreadPool中的線程,類似FQueuedLowLevelThreadPool共用TaskGraph的線程,因為Editor下后臺任務更多,因此單獨使用了GLargeThreadPool。這么做的目的還是減少線程創建。
![]()
主要成員
FQueuedThreadPool* WrappedQueuedThreadPool; 包裝的ThreadPool。
TArray WorkPool; Work集合。
TMap ScheduledWork; 當前正在被執行的Work。
std::atomic MaxConcurrency; 最多允許多少Work在后臺線程池中運行。
std::atomic CurrentConcurrency; 當前在后臺線程池中運行的Work。
FScheduledWork
成員中出現了FScheduledWork類型,它是一個容器,存儲了真正的IQueuedWork,同時也是IQueuedWork的子類,有DoThreadedWork接口。
![]()
其中128行執行了異步任務,131行通知FQueuedThreadPoolWrapper任務執行完,可調度下個任務,會在下面介紹。
初始化
構造函數如下,主要接受一個線程池作為后臺線程池,InMaxConcurrency表示最多同時在后臺線程池中執行多少個任務。
![]()
AddQueuedWork
AddQueuedWork首先把任務加到QueuedWork中,然后執行Schedule函數,默認參數為空。
![]()
Schedule函數最重要的是下面幾行。首先從QueuedWork中獲取要執行的任務,然后遞增CurrentConcurrency。接著通過AllocateWork獲取一個FScheduledWork對象,并把InnerWork封裝在里面,然后把FScheduledWork交給后臺線程池運行。
WorkPool容器就緩存了已創建的FScheduledWork對象,AllocateWork會首先從中獲取,沒有再創建,避免性能上的浪費。
![]()
執行
FScheduledWork執行完DoThreadedWork后,會調用Release,繼續讓線程池執行剩余任務,并把自己重置,加入WorkPool中,等待下次使用。
![]()
圖示如下:
![]()
文末,再次感謝南京周潤發 的分享, 作者主頁:https://www.zhihu.com/people/xu-chen-71-65, 如果您有任何獨到的見解或者發現也歡迎聯系我們,一起探討。(QQ群: 793972859 )。
近期精彩回顧
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.