Unity JobSystem使用及技巧

什么是JobSystem

并行编程

在游戏开发过程中我们经常会遇到要处理大量数据计算的需求,因此为了充分发挥硬件的多核性能,我们会需要用到并行编程,多线程编程也是并行编程的一种。

线程是在进程内的,是共享进程内存的执行流,线程上下文切换的开销是相当高的,大概有2000的CPU Circle,同时会导致缓存失效,导致万级别的CPU Circle,Job System的设计使用了线程池,一开始先将大量的计算任务分配下去尽量减少线程的执行流被打断,也降低了一些thread的切换开销。
Unity JobSystem使用及技巧

Unreal Unity大部分都是这种模型,分配了一些work thread 然后其他的线程往这个线程塞Task,相比fixed thread模式性能好一些,多出了Task的概念,Unity里称这个为Job。

建议看看Games104并行架构部分

Unity JobSystem

通常Unity在一个线程上执行代码,该线程默认在程序开始时运行,称为主线程。我们在主线程使用JobSystem的API,去给worker线程下发任务,就是使用多线程

通常Unity JobSystem会和Burst编译器一起使用,Burst会把IL变成使用LLVM优化的CPU代码,执行效率可以说大幅提升,但是使用Burst时候debug会变得困难,会缺少一些报错的堆栈,此时关闭burst可以看到一些堆栈,更方便debug。
虽然并行编程有着种种的技巧,比如,线程之间沟通交流数据有需要加锁、原子操作等等的数据交换等操作。但是Unity为了让我们更容易的编写多线程代码,

通过一些规则的制定,规避了一些复杂行为,同时也限制了一些功能,必要时这些功能也可以通过添加attribute、或者使用指针的方式来打破一些规则。
规定包括但不限于:

  • 不允许访问静态变量
  • 不允许在Job里调度子Job
  • 只能向Job里传递值类型,并且是通过拷贝的方式从主线程将数据传输进Job,当Job运行结束数据会拷贝回主线程,我们可以在主线程的job对象访问Job的执行结果。
  • 不允许在Native容器里添加托管类型
  • 不允许使用指针
  • 不允许多个Job同时写入同一个地方
  • 不允许在Job里分配额外内存

可以查看 官方文档

应用场景

基本上所有需要处理数据计算的场景都可以使用,我们可以用它做大量的游戏逻辑的计算,
我们也可以用它来做一些编辑器下的工具,可以达到加速的效果。

细节

接口

unity官方提供了一系列的接口,写一个Struct实现接口便可以执行多线程代码,提供的接口包括:

  • IJob:一个线程
  • IJobParallelFor:多线程,使用时传入一个数组,根据数组长度会划分出任务数量,每个任务的索引就是数组元素的索引
  • IJobParallelForTransform:并行访问Transform组件的,这是unity自己实现的比较特殊的读写Transform信息的Job,实测下来用起来貌似worker还是一个在动,但是经过Burst编译后快不少。
  • IJobFor:几乎没用

IJobParallelFor是最常用的,对数据源中的每一项都调用一次 Execute 方法。Execute 方法中有一个整数参数。该索引用于访问和操作作业实现中的数据源的单个元素。

容器

Job使用的数据都需要使用Unity提供的Native容器,我们在主线程将要计算的数据装进NativeContainer里然后再传进Job。
主要会使用的容器就是NativeArray,其实就是一个原生的数组类型,其他的容器这里暂时不提
这些容器还要指定分配器,分配器包括

  • Allocator.Temp: 最快的配置。将其用于生命周期为一帧或更少的分配。从主线程传数据给Job时,不能使用Temp分配器。
  • Allocator.TempJob: 分配比 慢Temp但比 快Persistent。在四帧的生命周期内使用它进行线程安全分配。
  • Allocator.Persistent: 最慢的分配,但只要你需要它就可以持续,如果有必要,可以贯穿应用程序的整个生命周期。它是直接调用malloc. 较长的作业可以使用此 NativeContainer 分配类型。

容器在实现Job的Struct里可以打标记,包括ReadOnly、WriteOnly,一方面可以提升性能,另一方面有时候会有读写冲突的情况,此时应该尽量多标记ReadOnly,避免一些数据冲突。

创建 使用

官方文档已经说的很好。
https://docs.unity3d.com/Manual/JobSystemCreatingJobs.html
对于ParallelFor的Schedule多了一些参数,innerloopBatchCount这个参数可以留意一下,可以理解为一个线程次性拿走多少任务。

Job之间互相依赖

https://docs.unity3d.com/Manual/JobSystemJobDependencies.html

其实执行了一个Job之后,在主线再执行另一个Job也不会性能差很多,并且易于debug,可以断点查看多个阶段执行过程中Job的数据情况,但是追求完美还是可以把依赖填上。

性能测试比较

笔者曾经做过简单的使用Job和不用Job的对比,通过打上Unity Profiler的标记,可以方便的在图表里查看运行开销。

Profiler.BeginSample("Your Target Profiler Name"); // your code Profiler.EndSample(); 

IJob

using System.Collections; using System.Collections.Generic; using Unity.Collections; using Unity.Jobs;  using UnityEngine; using Unity.Burst; [BurstCompile]  public class JobTest : MonoBehaviour {      public bool useJob;     // Update is called once per frame     void Update()     {         float startTime = Time.realtimeSinceStartup;         if (useJob)         {             NativeArray<int> result = new NativeArray<int>(1, Allocator.TempJob);//four frame allocate             MyJobSystem0 job0 = new MyJobSystem0();             job0.a = 0;             job0.b = 1;             job0.result = result;             JobHandle handle = job0.Schedule();             handle.Complete();             result.Dispose();             Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");         }         else         {             var index = 0;             for(int i = 0; i < 1000000; i++)             {                 index++;             }             Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");         }     }      } [BurstCompile]  public struct MyJobSystem0 : IJob {     public int a;     public int b;     public NativeArray<int> result;      public void Execute()     {         var index = 0;         for(int i = 0; i < 1000000; i++)         {             index++;         }         result[0] = a + b;     } } 

使用IJob执行一项复杂的工作,没有使用job跑了2-4ms,使用job也是跑了2-4 ms,但是使用了job+burst,这个for循环的速度就变得只有0.2-0.8 ms了,burst对此优化挺大的。

IJobParallelFor

using System; using System.Collections; using System.Collections.Generic; using Unity.Collections; using Unity.Jobs; using UnityEngine;  public class JobForTest : MonoBehaviour {     public bool useJob;     public int dataCount;     private NativeArray<float> a;      private NativeArray<float> b;      private NativeArray<float> result;      private List<float> noJobA;      private List<float> noJobB;      private List<float> noJobResult;     // Update is called once per frame     private void Start()     {         a = new NativeArray<float>(dataCount, Allocator.Persistent);         b = new NativeArray<float>(dataCount, Allocator.Persistent);         result = new NativeArray<float>(dataCount, Allocator.Persistent);         noJobA = new List<float>();         noJobB = new List<float>();         noJobResult = new List<float>();                  for (int i = 0; i < dataCount; ++i)         {             a[i] = 1.0f;             b[i] = 2.0f;             noJobA.Add(1.0f);             noJobB.Add(2.0f);             noJobResult.Add(0.0f);         }     }      void Update()     {         float startTime = Time.realtimeSinceStartup;         if (useJob)         {             MyParallelJob jobData = new MyParallelJob();             jobData.a = a;               jobData.b = b;             jobData.result = result;             // 调度作业,为结果数组中的每个索引执行一个 Execute 方法,且每个处理批次只处理一项             JobHandle handle = jobData.Schedule(result.Length, 1);             // 等待作业完成             handle.Complete();                          Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");          }         else         {              for(int i = 0; i < dataCount; i++)             {                 noJobA[i] = 1;                 noJobB[i] = 2;                 noJobResult[i] = noJobA[i]+noJobB[i];             }             Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");         }     }      private void OnDestroy()     {         // 释放数组分配的内存         a.Dispose();         b.Dispose();         result.Dispose();     } }  // 将两个浮点值相加的作业 public struct MyParallelJob : IJobParallelFor {     [ReadOnly]     public NativeArray<float> a;     [ReadOnly]     public NativeArray<float> b;     public NativeArray<float> result;      public void Execute(int i)     {         result[i] = a[i] + b[i];     } } 

普通for寻找两个list,遍历list元素然后相加,数据量10万,每一个批次这里是处理1个execute, 不开job 2.48ms,开job 1.34ms,job开了burst就0.28ms。

IJobParalForTransform

using Unity.Burst; using Unity.Collections; using Unity.Jobs; using Unity.Mathematics; using UnityEngine; using UnityEngine.Jobs;  public class TransformJobs : MonoBehaviour {     public bool useJob;     public int dataCount = 100;     //public int batchCount;     // 用于存储transform的NativeArray     private TransformAccessArray m_TransformsAccessArray;     private NativeArray<Vector3> m_Velocities;      private PositionUpdateJob m_Job;     private JobHandle m_PositionJobHandle;     private GameObject[] sphereGameObjects;      //[BurstCompile]     struct PositionUpdateJob : IJobParallelForTransform     {         // 给每个物体设置一个速度         [ReadOnly]         public NativeArray<Vector3> velocity;          public float deltaTime;          // 实现IJobParallelForTransform的结构体中Execute方法第二个参数可以获取到Transform         public void Execute(int i, TransformAccess transform)         {             transform.position += velocity[i] * deltaTime;         }     }      void Start()     {         m_Velocities = new NativeArray<Vector3>(dataCount, Allocator.Persistent);          // 用代码生成一个球体,作为复制的模板         var sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);         // 关闭阴影         var renderer = sphere.GetComponent<MeshRenderer>();         renderer.shadowCastingMode = UnityEngine.Rendering.ShadowCastingMode.Off;         renderer.receiveShadows = false;          // 关闭碰撞体         var collider = sphere.GetComponent<Collider>();         collider.enabled = false;          // 保存transform的数组,用于生成transform的Native Array         var transforms = new Transform[dataCount];         sphereGameObjects = new GameObject[dataCount];         int row = (int)Mathf.Sqrt(dataCount);         // 生成1W个球         for (int i = 0; i < row; i++)         {             for (int j = 0; j < row; j++)             {                 var go = GameObject.Instantiate(sphere);                 go.transform.position = new Vector3(j, 0, i);                 sphereGameObjects[i * row + j] = go;                 transforms[i*row+j] = go.transform;                 m_Velocities[i*row+j] = new Vector3(0.1f * j, 0, 0.1f * j);             }         }          m_TransformsAccessArray = new TransformAccessArray(transforms);     }      void Update()     {         //float startTime = Time.realtimeSinceStartup;         if (useJob)         {             // 实例化一个job,传入数据             m_Job = new PositionUpdateJob()             {                 deltaTime = Time.deltaTime,                 velocity = m_Velocities,             };              // 调度job执行             m_PositionJobHandle = m_Job.Schedule(m_TransformsAccessArray);             //Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");         }         else         {             for (int i = 0; i < dataCount; ++i)             {                 sphereGameObjects[i].transform.position +=  m_Velocities[i] * Time.deltaTime;             }             //Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");         }             }      // 保证当前帧内Job执行完毕     private void LateUpdate()     {         m_PositionJobHandle.Complete();     }      // OnDestroy中释放NativeArray的内存     private void OnDestroy()     {         m_Velocities.Dispose();         m_TransformsAccessArray.Dispose();     } }  

100+vec3,不用job 0.02ms,用job +burst 0.02ms
1600+vec3,不用job 0.31ms,用job 0.07ms +burst 0.04ms
1万+vec3,不用job 2.23ms,用job 0.35ms + burst 0.12ms
1万+float3,不用job 2.55ms,用job 0.4ms
100万+float3,不用job 199ms ,用job 40ms + burst 31ms
100万+vec3,不用job 189ms ,用job 35ms + burst 31ms

高级技巧

使用特定的数学库中的实现

unity特定的数学库中的数据类型可以获取simd优化,比如vector3就可以换成float3,但是缺少的数学库,就要自己解决了,所以我一般就vector3。

在合适的时机Schedule和Complete

拥有作业所需的数据后就立即在作业上调用 Schedule,并仅在需要结果时才开始在作业上调用 Complete。最好是调度当前不与正在运行的任何其他作业竞争的、不需要等待的作业。例如,如果在一帧结束和下一帧开始之间的一段时间没有作业正在运行,并且可以接受一帧延迟,则可以在一帧结束时调度作业,并在下一帧中使用其结果。另一方面,如果游戏占满了与其他作业的转换期,但在帧中的其他位置存在大量未充分利用的时段,那么在这个时段调度作业会更加有效。

在单线程里运行JobSystem

IJobParallelForExtensions可以调用Run方法,会将所有的Job放到一个Thread里执行,之前我们提到了Schedule的innerloopBatchCount参数,将它调到和数据源一样大,也是在一个Thread里执行,
当我们的数据量小于1000,分配线程可能都觉得费劲,用单线程的JobSystem配合Burst效果可能更好。
需要注意的是,如果我们出现了并行写入问题(多个Thread同时写一个位置),在单线程模式下是不会报错的。

使用NativeDisableUnsafePtrRestriction

Unity JobSystem使用及技巧

打上这个标记后可以在Job里使用Unsafe代码块,使用指针
有多个好处

  • 可以不需要拷贝数组就把主线程的数据塞进子线程,对数据量大,需要频繁调用的可以考虑
  • 可以包装一些托管内存,比如我这里就包装了一个二维数组,每个containsTriangleIndex其实是一个int的NativeArray
    Unity JobSystem使用及技巧

如果struct里有NativeArray,这个struct放进NativeArray的时候会过不了安全检查。
我这里是在主线程维护好了这些动态的数组,然后再传进了这个结构的。
在unsafe代码块里,Native容器相关的API中有GetUnsafePtr可以获得指针。

SamplePointRayTriangleJob samplePointRayTriangleJob = new SamplePointRayTriangleJob();   samplePointRayTriangleJob.meshTriangles = jobMeshTriangles;   samplePointRayTriangleJob.randomDirs = jobRandomDirs;   samplePointRayTriangleJob.useGrid = useGrid;   samplePointRayTriangleJob.allStartPoints = startPoints;   samplePointRayTriangleJob.allTriangleBoundsJobDatas = (TriangleBoundsJobData*)triangleBoundsJobDatas.GetUnsafePtr(); 

NativeDisableParallelForRestriction并行写入

Unity JobSystem使用及技巧

打上这个标记后,多个Thread同时数组的同一个地方进行写入,unity不会阻拦,但是自己也要处理好逻辑问题。

举个例子:下面这篇文章里
https://blog.csdn.net/n5/article/details/123742777
在Parallel Job里面进行光栅化三角形时,多个三角形有可能并行访问depth buffer/frame buffer的相同地方。这在多线程编程中属于race conditions,Job system内部会检测出来,会直接报错。

IndexOutOfRangeException: Index 219108 is out of restricted IJobParallelFor range [4392…4392] in ReadWriteBuffer.
ReadWriteBuffers are restricted to only read & write the element at the job index. You can use double buffering strategies to avoid race conditions due to reading & writing in parallel to the same elements from a job.

NativeDisableContainerSafetyRestriction

使用这个Attribute可以在子线程分配一块内存,比如我这里每个子线程是创建了一个数组来接受光线三角形求交,一根光线击中了多少个点,一个子任务会执行许多次光线遍历Mesh
Unity JobSystem使用及技巧

这个主要是博主在Github上学习Unity官方的MeshApiExample项目看到的案例,有点像StaticBatch
可以查看这个链接:把整个场景的Mesh合并

DeallocateOnJobCompletion

容器在job结束之后自动释放
这个博主用的很少 基本都是主动释放
可能在用非并行Job的时候 接受外面的NativeArray后自己不想管释放之类的。
可以查看一个github上别人的案例看看:案例

自定义Native容器

https://docs.unity3d.com/Manual/job-system-custom-nativecontainer-example.html

思考

JobSystem与ComputeShader相比 优势

JobSystem主要是利用CPU来降低计算负载,在数量级上远远比不上GPU,在前面的性能测试中数据到万以上就相当吃力了。
ComputeShader是利用GPU来降低计算负载,,现在GPU Driven的技术也逐渐越来越多。

思考这两个的取舍主要应该看业务逻辑的数据流向,如果我们的数据是从CPU发起的,那么在把数据从CPU拷贝到GPU也是肯定是不如在CPU内做拷贝要快的,
如果我们的计算的数据最后是给CPU做下步计算的,如果用GPU做计算就会出现CPU等GPU的回读问题,数据若停留在GPU,那么ComputeShader自然好。

另外就是考虑两个后端的硬件特性,CPU高主频,处理复杂的逻辑,大量的循环、分支判断上比GPU要有优势,数量级上则GPU更有优势。

最后也可以考虑一下易用性问题,如果用到了很多原本在CPU里的数学库,在JobSystem里都是可以直接用的,ComputeShader的话则需要自己实现一版,不过脚手架这种东西属于见仁见智,
只要自己方便就好。

2023.3.21
flyingziming

发表评论

相关文章