Ctorch开发日志——矩阵乘法优化及数学原理

随着项目的推进,本作者遇到了目前最棘手的问题,即矩阵乘法的优化

但是有句话说得好

“你越棘手,我越兴奋”

那么,如下是本作者如何把(O(MNK))(O(n^3)))的朴素矩阵乘法一步一步优化到(O(n^{2.81})) 的全过程

测试环境

macOS Tahoe 26 Beta 2
M3 Pro 11核
18GB
CLion & Cmake
计时器:ctime
矩阵:1024 * 1024 @ 1024 * 1024
为保证准确,时间均为5次测量取平均值

朴素算法实现 & 测速

朴素实现的数学原理

其实就是把矩阵乘的数学公式重写一遍:

[A={left[ a_{ij}right]_{m times n}} ]

[B={left[ b_{ij}right]_{n times s}} ]

[C= {A times B}=left[ c_{ij}right]_{m times s} ]

[= left[ sum limits_{k=1}^{n}a_{ik}b_{kj}right]_{m times s} ]

注意:矩阵乘法不满足交换律
只有左矩阵的列数与右矩阵的行数相同的两个矩阵才能相乘
乘积矩阵的行数等于左矩阵的行数,列数等于右矩阵的列数

概括一下就是

乘积矩阵第i行第j列处的元素等于左矩阵的第i行与右矩阵的第j列对应元素乘积之和

那么,这个很简单,上代码吧
为了测试,所有的矩阵保证满足乘法条件且为2维

时间复杂度 (O(n^3))

// 原始版本(未优化) void matrix_mult(float* A, float* B, float* C, int N) {     for (int i = 0; i < N; i++)         for (int j = 0; j < N; j++)             for (int k = 0; k < N; k++)                 C[i*N + j] += A[i*N + k] * B[k*N + j]; } 

在实际测试中,此算法跑出了1898.46ms优秀成绩

优化一:循环优化

你可能会疑惑,循环优化是什么
故名思义,就是对原有的ijk的循环重新更换顺序为ikj

一个更大的问题来了,凭什么仅仅改变顺序就快了许多

那么不妨看看访问顺序

算法1(朴素实现):

在这个顺序中,最内层循环是k,它遍历A的一行和B的一列。
对于A的访问是连续的(因为A[i][k]在内存中是按行存储的,所以k增加时是连续访问),
但是B的访问是不连续的(因为B[k][j]在内存中是按行存储,k增加时访问的是不同行的同一列,所以是跳跃访问)。这样对B的访问会导致缓存失效。

so,真正影响到速度的,就是缓存,在顺序读取中,缓存可以加载一整行,不必跳跃元素访问

那么,有没有一种循环顺序,使得对三个数组均为顺序访问呢
有的兄弟,有的,让我们欢迎仍为(O(n^{3}))的优化算法出场
——“ikj”循环优化

在这个顺序中,最内层循环是j。对于A的访问,固定i和k,所以每次内层循环A[i][k]是常数。对于B的访问,是B[k][j],由于j是连续的,所以B的访问是连续的(因为同一行连续列)。同时,C的访问也是连续的(C[i][j])。这样,所有的内存访问都是连续的,因此性能更好。

可以自行验证,对于三个数组,均为顺序访问
给出如下代码:
时间复杂度 (O(n^3))

// 优化后(行优先访问) void matrix_mult_opt1(float* A, float* B, float* C, int N) {     for (int i = 0; i < N; i++)         for (int k = 0; k < N; k++)  // k循环提到中间             for (int j = 0; j < N; j++)                 C[i*N + j] += A[i*N + k] * B[k*N + j]; } 

在实际测试中,此算法跑出了1462.89ms的优秀成绩
提升:1898.46ms-1462.89ms = 435.57ms 提高22.9%

进阶提升 优化二:矩阵分块算法

顾名思义,分块算法即是把矩阵分为多个小矩阵,对每个矩阵操作后再组合出结果,类似分块算法

那么,它为什么快呢

在计算机中,共有三种CPU缓存以及普通内存,即L1、L2、L3 Cache和内存
前三种的速度要比内存快很多很多,大概只有一个CPU周期的延迟,而普通内存可以达到上百周期延迟
而比他们更快的就是寄存器,直接接触CPU,0延迟
唯一的问题是,寄存器的大小只够存储单个值

新的问题来了,怎样把一个巨大的矩阵放到只有128k-1m的L1缓存中呢

聪明的你一定想到了把原矩阵划分为多个小矩阵,每一个都能放到L1内进行运算
那么恭喜你,你已经知道了矩阵分块算法的原理

更确切的说,先定义一个常数 (blocks in N^{+}) 作为划分的单位矩阵的行列,
对于原矩阵A,我们把其中的 ({blocks times blocks}) 个元素划分为一个新矩阵,记为(a^{'}_{cdots}),我们定义:

[ a^{'}_{11} = begin{pmatrix} a_{11} & cdots & a_{1blocks} \ vdots & ddots &vdots \ a_{blocks1} & cdots & a_{blocks_{ }blocks} end{pmatrix} ]

其余以此类推,新矩阵(A^{'})即变为

[A^{'}=begin{pmatrix} a^{'}_{11} & cdots & a^{'}_{1n} \ vdots & ddots & vdots \ a^{'}_{m1} & cdots & a^{'}_{mn} end{pmatrix} ]

正如同我们可以把 (f(x)) 中的 (x) 替换为任意多项式(函数),我们同样也可以把矩阵中的每个元素换为一个矩阵,其运算规则仍然成立


so,显而易见的,分块矩阵算法有如下公式:

对于整体而言,

[begin{array}{c} C={A^{'}times B^{'}}=left[ c_{ij}right]_{m times s} = left[ sum limits_{k=1}^{n}a^{'}_{ik}b^{'}_{kj}right]_{m times s} end{array}]

其中:

[ a^{'}_{ij} = begin{pmatrix} a_{[(i-1) times blocks]{[(j-1)times blocks]}} & cdots & a_{[(i-1) times blocks]{[jtimes blocks]}} \ vdots & ddots &vdots \ a_{[itimes blocks]{[(j-1)times blocks]}} & cdots & a_{[itimes blocks]{[jtimes blocks]}} end{pmatrix} ]

其中的每个乘法 (a'{ik} times b'{kj}) 是子矩阵乘法
这里为了方便看,默认原矩阵的行列均为 blocks 的倍数
那么下一个很自然的问题就是

若行列不为blocks的倍数,怎么办

分两种情况:

  1. $ min(m,n) < blocks $
  2. $ exists m,n nmid blocks $

对于1,直接执行普通矩阵乘法即可,因为整个矩阵均可放于L1、L2 Cache中
对于2,我们定义分块矩阵的大小为$ p,q $

[p = min(blocks,M- i times blocks) ]

[q = min(blocks,N - j times blocks) ]

其中,

[M,N为被分块矩阵的行列 ]

[i,j为分块矩阵a^{'}_{ij}的下标 ]

至此,分块矩阵的全部问题已经解决

给出如下代码:
时间复杂度(O(n^3))

void block_mult(float* A, float* B, float* C, int N, int BLOCK) {     // 清除结果矩阵     memset(C, 0, N*N*sizeof(float));     // 三层分块循环     for (int i0 = 0; i0 < N; i0 += BLOCK) {         int i_end = min(i0 + BLOCK, N);  // 计算行边界         for (int k0 = 0; k0 < N; k0 += BLOCK) {             int k_end = min(k0 + BLOCK, N);  // 计算中间维度边界             for (int j0 = 0; j0 < N; j0 += BLOCK) {                 int j_end = min(j0 + BLOCK, N);  // 计算列边界                 // 核心计算:只处理完整块内的元素                 for (int i = i0; i < i_end; i++) {                     for (int k = k0; k < k_end; k++) {                         float a_val = A[i*N + k];  // 一次加载A元素                         // 内层循环:连续访问B和C                         for (int j = j0; j < j_end; j++) {                             C[i*N + j] += a_val * B[k*N + j];                         }                     }                 }             }         }     } } 

由于矩阵过小时,分块算法优势不大,且会增加调用开销,因此,这里的测试,(m,n)为2048

实测结果:(blocks = 512) 时,用时 11515.2ms

而不使用分块仅循环优化的算法 用时 16028.9ms
朴素实现 用时 18053.45ms
提升:16028.9ms-11515.2ms = (4513.2ms) 提高:(28.1%)

高手过招 优化三 :并行与SIMD

何为并行与SIMD
并行:多线程同时处理多个分块
SIMD:乘加一体,即一条CPU指令同时处理乘与加
我们这里使用Apple的AMX(Apple Matrix协处理器)(也属于CPU的一部分,并非GPU优化)
对于x86架构和其余ARM架构的处理器,可以使用AVX、AVX_512、SSE等SIMP指令集

它的特性有:

  • Apple Silicon芯片(M1/M2/M3等)内置的专用矩阵运算单元
  • 可并行处理大量16位浮点(FP16)或整数(INT8)运算

每个AMX单元包含:

  • 8个32KB的寄存器文件
  • 可同时执行2048次乘加运算(16x16x8矩阵)
  • 专用数据通路减少内存访问延迟

以及自动分块
如下是使用AMX的SIMP的代码:
时间复杂度:(O(n^3))

// 使用Apple的AMX加速BLAS库             cblas_sgemm(CblasRowMajor,   // 行主序存储                         CblasNoTrans,   // 不转置A                         CblasNoTrans,   // 不转置B                         M,              // A的行数                         N,              // B的列数                         K,              // 公共维度                         1.0f,           // alpha系数                         a_data,         // A数据指针                         K,              // A的列步幅(lda)                         b_data,         // B数据指针                         N,              // B的列步幅(ldb)                         0.0f,           // beta系数                         r_data,         // 结果数据指针                         N);             // 结果的列步幅(ldc) 

那么,本次优化最吓人、最恐怖的一次数据来了:
实测数据:202.831ms(4096*4096)
而标准分块+循环优化算法,在2048*2048时,就已经11515.2ms
提升:11312.37 ms 提高:98.2%

数学手段 优化4:Strassen算法 & 变种

温馨提示:到这里已经是高等数学内容了(实不相瞒,前面其实也是),有点小烧脑,不过欢迎各位继续跟作者一起尝试,本作者大约花了3小时搞完这一部分的证明

介绍:

Strassen算法是一种通过数学变换减少乘法次数的高效矩阵乘/卷积算法

简要推导:

我们设有如下两个矩阵相乘:

[begin{pmatrix} a_{11} & a_{12} \ a_{21} & a_{22} end{pmatrix} times begin{pmatrix} b_{11} & b_{12} \ b_{21} & b_{22} end{pmatrix} = begin{pmatrix} c_{11} & c_{12} \ c_{21} &c_{22} end{pmatrix} ]

传统计算需要8次乘法:

[c_{11} = a_{11}times b_{11} + a_{12}times b_{21} \ ]

[c_{12} = a_{11}times b_{12} + a_{12}times b_{22} \ ]

[c_{21} = a_{21}times b_{11} + a_{22}times b_{21} \ ]

[c_{22} = a_{21}times b_{12} + a_{22}times b_{22} \ ]

而Strassen算法只需7次乘
接下来,是Strassen算法最精妙绝伦的一步:
作者定义了7个矩阵:

[begin{align*} M_1 &= (a_{11} + a_{22})(b_{11} + b_{22}) \ M_2 &= (a_{21} + a_{22})b_{11} \ M_3 &= a_{11}(b_{12} - b_{22}) \ M_4 &= a_{22}(b_{21} - b_{11}) \ M_5 &= (a_{11} + a_{12})b_{22} \ M_6 &= (a_{21} - a_{11})(b_{11} + b_{12}) \ M_7 &= (a_{12} - a_{22})(b_{21} + b_{22}) end{align*} ]

真正让人惊讶的是下一步:
作者构建的7个矩阵,可以通过有限次的组合成为结果矩阵的一个元素
什么意思呢,让我们尝试展开其中一项:

[c_{11} = M_1 + M_4 - M_5 + M_7 \ ]

[begin{align*} &=(a_{11} + a_{22})(b_{11} + b_{22}) + a_{22}(b_{21} - b_{11}) - (a_{11} + a_{12})b_{22} + (a_{12} - a_{22})(b_{21} + b_{22}) \ end{align*}]

[begin{align*} = & a_{11}b_{11} + cancel{a_{11}b_{22}} + cancel{a_{22}b_{11}} + cancel{a_{22}b_{22}} \ &+ cancel{a_{22}b_{21}} - cancel{a_{22}b_{11}} \ &- cancel{a_{11}b_{22}} - a_{12}b_{22} \ &+ a_{12}b_{21} + cancel{a_{12}b_{22}} - cancel{a_{22}b_{21}} - cancel{a_{22}b_{22}} \ = & a_{11}b_{11} + a_{12}b_{21} end{align*}]

由此,可以类似的推出(c_{12}、c_{21}、c_{22})均与正常计算一致
最后的结果矩阵为

[C= begin{pmatrix} M_{1}+M_{4}-M_{5}+M_{7} & M_{3}+M_{5} \ M_{2}+M_{4} & M_{1}-M_{2}+M_{3}+M_{6} end{pmatrix} ]

接下来,我们证明其对于n>2时仍然成立:
由于( n=2 )时,我们已经证明其正确
所以,在n>2时,我们采取分块
将原矩阵分为4块,此时我们将其中的子矩阵看为一个元素
那么此时又回归到了标准的2x2的Strassen算法
由此在$n,m mid 2 $时Strassen算法正确
那么,下一个很自然的问题就是

(n,m nmid {2} ,结论是否成立)

我们的做法是,将矩阵分块,分为几个(2^k times 2^k)的子矩阵以及几个符合矩阵乘规则的小矩阵
显然由于前文的分块算法的正确性,此时的分块仍然正确,对于几个(2^k times 2^k)的矩阵,我们使用Strassen算法进行计算
现在来计算一下Strassen算法的时间复杂度:
设$ T(n) $ 为计算 $ n times n $ 矩阵乘法的时间:
(T(n) = 7Tleft(frac{n}{2}right) + O(n^2))

  • $ 7T(n/2) $:7 个子问题递归计算
  • $O(n^2) $:矩阵加减法开销(共 18 次加减法)

根据主定理1,
$ a = 7,b = 2,f(n)=Theta(n^2) $
(log _b a = log_27 approx 2.807)
由于(log _b a > 2),所以(f(n) = O({n^{log_b a-epsilon})} = O(n^{log_27}) approx O(n^{2.807}))
更精确的,复杂度为(Theta(n^{2.807}))

具体的算法为:
1.先将矩阵AB分块,分成大小为 ({blocks times blocks}) 的若干块以及几个任意大小的子块
2.对于 ({blocks times blocks}) 的子块,我们使用Strassen算法计算
3.在递归过程中,若方阵大小(因为Strassen算法开始时为方阵)n = 128,则使用循环优化的矩阵乘法
4.否则,继续按照Strassen算法递归计算直至n = 128
5.对于不是 ({blocks times blocks}) 的子块,使用循环优化的矩阵乘法计算
给出如下代码:
近似时间复杂度(O(n^{2.81}))

const int BLOCK_SIZE = 2048; const int STRASSEN_THRESHOLD = 128;  // 标准矩阵乘法 (用于小矩阵和边界处理) void standard_matmul(const float* A, const float* B, float* C, int n, int m, int p, int lda, int ldb, int ldc) {     for (int i = 0; i < n; ++i) {         for (int k = 0; k < m; ++k) {             float a = A[i * lda + k];             for (int j = 0; j < p; ++j) {                 C[i * ldc + j] += a * B[k * ldb + j];             }         }     } }  // 循环优化矩阵乘法 (n=128时使用) void optimized_matmul(const float* A, const float* B, float* C, int n, int lda, int ldb, int ldc) {     for (int i = 0; i < n; ++i) {         for (int k = 0; k < n; ++k) {             float a = A[i * lda + k];             for (int j = 0; j < n; ++j) {                 C[i * ldc + j] += a * B[k * ldb + j];             }         }     } }  // 矩阵加法 void matrix_add(const float* A, const float* B, float* C, int n, int lda, int ldb, int ldc) {     for (int i = 0; i < n; ++i) {         for (int j = 0; j < n; ++j) {             C[i * ldc + j] = A[i * lda + j] + B[i * ldb + j];         }     } }  // 矩阵减法 void matrix_subtract(const float* A, const float* B, float* C, int n, int lda, int ldb, int ldc) {     for (int i = 0; i < n; ++i) {         for (int j = 0; j < n; ++j) {             C[i * ldc + j] = A[i * lda + j] - B[i * ldb + j];         }     } }  // 结果累加到目标矩阵 void matrix_add_to_target(float* T, const float* S, int n, int ldt, int lds) {     for (int i = 0; i < n; ++i) {         for (int j = 0; j < n; ++j) {             T[i * ldt + j] += S[i * lds + j];         }     } }  // Strassen 矩阵乘法 (递归实现) void strassen_matmul(const float* A, const float* B, float* C, int n, int lda, int ldb, int ldc) {     // 递归基: n <= 128 使用优化乘法     if (n <= STRASSEN_THRESHOLD) {         optimized_matmul(A, B, C, n, lda, ldb, ldc);         return;     }      int half = n / 2;     // 定义子矩阵指针     const float* A11 = A;     const float* A12 = A + half;     const float* A21 = A + half * lda;     const float* A22 = A + half * lda + half;          const float* B11 = B;     const float* B12 = B + half;     const float* B21 = B + half * ldb;     const float* B22 = B + half * ldb + half;          float* C11 = C;     float* C12 = C + half;     float* C21 = C + half * ldc;     float* C22 = C + half * ldc + half;      // 分配临时矩阵     std::vector<float> S1(half * half);     std::vector<float> S2(half * half);     std::vector<float> S3(half * half);     std::vector<float> S4(half * half);     std::vector<float> S5(half * half);     std::vector<float> S6(half * half);     std::vector<float> S7(half * half);     std::vector<float> S8(half * half);     std::vector<float> S9(half * half);     std::vector<float> S10(half * half);          std::vector<float> P1(half * half);     std::vector<float> P2(half * half);     std::vector<float> P3(half * half);     std::vector<float> P4(half * half);     std::vector<float> P5(half * half);     std::vector<float> P6(half * half);     std::vector<float> P7(half * half);      // 计算S矩阵     matrix_subtract(B12, B22, S1.data(), half, ldb, ldb, half);    // S1 = B12 - B22     matrix_add(A11, A12, S2.data(), half, lda, lda, half);         // S2 = A11 + A12     matrix_add(A21, A22, S3.data(), half, lda, lda, half);         // S3 = A21 + A22     matrix_subtract(B21, B11, S4.data(), half, ldb, ldb, half);    // S4 = B21 - B11     matrix_add(A11, A22, S5.data(), half, lda, lda, half);         // S5 = A11 + A22     matrix_add(B11, B22, S6.data(), half, ldb, ldb, half);         // S6 = B11 + B22     matrix_subtract(A12, A22, S7.data(), half, lda, lda, half);    // S7 = A12 - A22     matrix_add(B21, B22, S8.data(), half, ldb, ldb, half);         // S8 = B21 + B22     matrix_subtract(A11, A21, S9.data(), half, lda, lda, half);    // S9 = A11 - A21     matrix_add(B11, B12, S10.data(), half, ldb, ldb, half);        // S10 = B11 + B12      // 递归计算P矩阵     strassen_matmul(A11, S1.data(), P1.data(), half, lda, half, half);      // P1 = A11 * S1     strassen_matmul(S2.data(), B22, P2.data(), half, half, ldb, half);      // P2 = S2 * B22     strassen_matmul(S3.data(), B11, P3.data(), half, half, ldb, half);      // P3 = S3 * B11     strassen_matmul(A22, S4.data(), P4.data(), half, lda, half, half);      // P4 = A22 * S4     strassen_matmul(S5.data(), S6.data(), P5.data(), half, half, half, half); // P5 = S5 * S6     strassen_matmul(S7.data(), S8.data(), P6.data(), half, half, half, half); // P6 = S7 * S8     strassen_matmul(S9.data(), S10.data(), P7.data(), half, half, half, half);// P7 = S9 * S10      // 组合结果矩阵 (累加到C)     // C11 = P5 + P4 - P2 + P6     matrix_add_to_target(C11, P5.data(), half, ldc, half);     matrix_add_to_target(C11, P4.data(), half, ldc, half);     matrix_add_to_target(C11, P6.data(), half, ldc, half);     for (int i = 0; i < half; ++i) {         for (int j = 0; j < half; ++j) {             C11[i * ldc + j] -= P2[i * half + j];         }     }          // C12 = P1 + P2     matrix_add_to_target(C12, P1.data(), half, ldc, half);     matrix_add_to_target(C12, P2.data(), half, ldc, half);          // C21 = P3 + P4     matrix_add_to_target(C21, P3.data(), half, ldc, half);     matrix_add_to_target(C21, P4.data(), half, ldc, half);          // C22 = P5 + P1 - P3 - P7     matrix_add_to_target(C22, P5.data(), half, ldc, half);     matrix_add_to_target(C22, P1.data(), half, ldc, half);     for (int i = 0; i < half; ++i) {         for (int j = 0; j < half; ++j) {             C22[i * ldc + j] -= (P3[i * half + j] + P7[i * half + j]);         }     } }  // 分块矩阵乘法 void matrix_multiply(const float* A, const float* B, float* C, int n, int m, int p, int lda, int ldb, int ldc) {     // 初始化输出矩阵为0     std::memset(C, 0, n * ldc * sizeof(float));          // 分块处理     for (int i = 0; i < n; i += BLOCK_SIZE) {         int i_end = std::min(i + BLOCK_SIZE, n);         int i_size = i_end - i;                  for (int k = 0; k < m; k += BLOCK_SIZE) {             int k_end = std::min(k + BLOCK_SIZE, m);             int k_size = k_end - k;                          for (int j = 0; j < p; j += BLOCK_SIZE) {                 int j_end = std::min(j + BLOCK_SIZE, p);                 int j_size = j_end - j;                                  // 当前块指针                 const float* A_block = A + i * lda + k;                 const float* B_block = B + k * ldb + j;                 float* C_block = C + i * ldc + j;                                  // 完整块使用Strassen算法                 if (i_size == BLOCK_SIZE && k_size == BLOCK_SIZE && j_size == BLOCK_SIZE) {                     strassen_matmul(A_block, B_block, C_block, BLOCK_SIZE, lda, ldb, ldc);                 }                  // 非完整块使用标准乘法                 else {                     standard_matmul(A_block, B_block, C_block, i_size, k_size, j_size, lda, ldb, ldc);                 }             }         }     } } } 

当然,实际测试中,我们使用Ctorch框架的Tensor类Op::Add,与此代码会略有差异
最终测试结果:1005.65ms
P.S.提升不明显的原因是矩阵过小,如果使用Tranformer架构的巨型矩阵测试,(O(n^{2.81}))的优势会非常明显

最终的方案:

我们使用多函数策略:
1.若dim<128 此时的拷贝开销已经大于AMX的优化,因此使用循环优化
2.128<dim<4096 AMX的最优区间,使用纯AMX
3.dim>4096 分块,对于能够分为(2^k)的块,使用Strassen算法,递归到2048使用AMX
对于不是(2^k)的块,直接使用AMX计算,同时,每个分块使用单一线程

最后的结果:

测试矩阵:16384163842((2^{14})
标准:30251ms
循环优化:24580ms
分块:20498ms
最终方案(SIMP+多线程+分块+Strassen):8267.97ms

最后

如果你希望既追求高性能又追求简洁的框架,那么Ctorch将是你的最优选择
尽管这个项目还在开发中,但是可以先小小的期待一下
欢迎贡献,如有错误,请各位不吝赐教,谢谢
2025.8.2

发表评论

评论已关闭。

相关文章