图像处理算法-OpenCV图像翻转 flip SIMD版(ippicv)复现

说明

本文以 OpenCV 中的图像翻转 cv::flip 为切入点,选取内部 ippicv 提供的图像翻转函数 ippiMirror(具体为 ippiMirror_8u_C1R)进行复现,以此加深对 SIMD(SSE)指令应用的理解。

ippicv(IPP for Computer Vision)是 Intel Integrated Performance Primitives(Intel IPP)中专门面向图像处理和计算机视觉场景的一个裁剪子集,由 Intel 官方以预编译第三方库的形式集成到 OpenCV 中,用来为部分核心算子提供基于 SSE/AVX 等指令集的高性能实现。它通常以静态库和头文件的形式出现,在开启 WITH_IPP=ON 构建选项后,会自动被下载并链接到 OpenCV 中,为常见图像操作(卷积、插值、几何变换等)实现更好的性能。

当前复现代码支持水平翻转或水平+垂直翻转。在有了这个版本后,复现完整的ippiMirror_8u_C1R也就只是时间问题。

关键词

OpenCV;SIMD ;图像处理;性能优化;图像翻转;流式存储;非临时存储;内存带宽瓶颈;内存受限算法;ippicv;ippiMirror;cv::flip
Keywords: OpenCV; SIMD; image processing; performance optimization; image flipping; streaming store; non-temporal store; memory bandwidth bottleneck; memory-bound algorithm; ippicv; ippiMirror; cv::flip

复现

#include <tmmintrin.h> #include <smmintrin.h> #include <immintrin.h>  #include <stdint.h> #include <stddef.h> #include <string.h>  #ifdef _MSC_VER     #define FORCE_INLINE __forceinline #else     #define FORCE_INLINE inline __attribute__((always_inline)) #endif  #ifdef _MSC_VER #include <intrin.h> static void cpuid(int info[4], int func_id) { __cpuid(info, func_id); } #else static void cpuid(int info[4], int func_id) { __asm__ __volatile__("cpuid":"=a"(info[0]),"=b"(info[1]),"=c"(info[2]),"=d"(info[3]):"a"(func_id)); } #endif  // 返回:0=无SSSE3,1=SSSE3,2=AVX2 int detect_simd_level(void){     int info[4] = {0};     cpuid(info, 0);     if (info[0] < 7) {         cpuid(info, 1);         int ssse3 = (info[2] >> 9) & 1; // ECX bit 9         return ssse3 ? 1 : 0;     } else {         cpuid(info, 1);         int ssse3 = (info[2] >> 9) & 1;         cpuid(info, 7);         int avx2  = (info[1] >> 5) & 1; // EBX bit 5         return avx2 ? 2 : (ssse3 ? 1 : 0);     } }  // ------------------------------------------------------------------------- // 图像翻转 // ------------------------------------------------------------------------- // 16字节翻转掩码: [15, 14, ... 0] static const __m128i mask_flip_128 = _mm_setr_epi8(         15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );  // 8字节翻转掩码: 低8字节翻转 [7...0],高8字节设为 -1 (保持不变或清零,取决于具体指令) // 在 SSE shuffle 中,高位 1 (0x80) 会将结果置零。 // 这里我们只关心低64位的结果,高位会被 _mm_storel_epi64 / cast 忽略。 static const __m128i mask_flip_64 = _mm_setr_epi8(         7, 6, 5, 4, 3, 2, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1 );  // Switch-Jump Table 处理尾部 1-7 字节 FORCE_INLINE void copy_tail_reversed_switch(const uint8_t* src, uint8_t* dst_end_ptr, int n) {     // src: 指向剩余数据的开头     // dst_end_ptr: 指向剩余数据在目标内存中的尾部+1位置     // 写入方向:从后向前。 dst_end_ptr[-1] = src[0]      switch (n) {         case 7: dst_end_ptr[-7] = src[6]; [[fallthrough]];         case 6: dst_end_ptr[-6] = src[5]; [[fallthrough]];         case 5: dst_end_ptr[-5] = src[4]; [[fallthrough]];         case 4: dst_end_ptr[-4] = src[3]; [[fallthrough]];         case 3: dst_end_ptr[-3] = src[2]; [[fallthrough]];         case 2: dst_end_ptr[-2] = src[1]; [[fallthrough]];         case 1: dst_end_ptr[-1] = src[0];     } }   /**  * @brief 处理翻转操作的尾部数据 (16/8/1-7 字节)  * @param srow        [in/out] 源数据当前位置指针(会被修改)  * @param drow_end    [in/out] 目标数据末尾位置指针(会被修改)  * @param w           [in/out] 剩余宽度(会被修改)  * @param mask_flip_128 [in] 128位翻转掩码  * @param mask_flip_64  [in] 64位翻转掩码  */ FORCE_INLINE void process_tail_reversed(     const uint8_t*& srow,     uint8_t*& drow_end,     int& w,     const __m128i& mask_flip_128,     const __m128i& mask_flip_64) {     // 16字节处理     if (w >= 16) {         __m128i v0 = _mm_loadu_si128((const __m128i*)(srow));         _mm_storeu_si128((__m128i*)(drow_end - 16), _mm_shuffle_epi8(v0, mask_flip_128));         srow += 16;         drow_end -= 16;         w -= 16;     }      // 8字节处理     if (w >= 8) {         __m128i v0 = _mm_loadl_epi64((const __m128i*)srow);         v0 = _mm_shuffle_epi8(v0, mask_flip_64);         *(int64_t*)(drow_end - 8) = _mm_cvtsi128_si64(v0);         srow += 8;         drow_end -= 8;         w -= 8;     }      // 1-7 字节尾部处理     if (w > 0) {         copy_tail_reversed_switch(srow, drow_end, w);     } }  /**  * @brief 8位单通道图像翻转  * @param pSrc        [in] 源图像的起始地址 (左上角).  * @param srcStep     [in] 源图像的行步长 (字节).  * @param pDst        [out] 目标图像的起始地址.  * @param dstStep     [in] 目标图像的行步长 (字节).  * @param width       [in] 宽度 (像素数).  * @param height      [in] 高度 (像素数).  * @param reverseRows [in] 垂直翻转标志位.  * - 0: 仅水平镜像 (行序不变).  * - 1: 水平镜像 + 垂直翻转 (即旋转 180 度).  * @return int64_t    返回实际使用的目标行步长 (dst_stride_actual).  * - 如果 reverseRows=0, 返回 dstStep.  * - 如果 reverseRows=1, 返回 -dstStep (用于指示反向遍历).  */ int64_t ippiMirror_8u_C1_simd(         const uint8_t *pSrc,         int srcStep,         uint8_t *pDst,         int dstStep,         int width,         int height,         int reverseRows) {     // 初始化步长和起始位置     int64_t dst_stride_actual = reverseRows ? -dstStep : dstStep;      int64_t dst_start_offset = reverseRows ? (int64_t)dstStep * (height - 1) : 0;      // 这里的 dst_ptr_base 指向要处理的第一行的"起始"位置     uint8_t* dst_ptr_base = pDst + dst_start_offset;      // ---------------------------------------------------------------------     // 检查 1: 对齐检查     //     // 因为使用了反向写入 (stream store backwards),只要写入的"起始点"(即行尾)是对齐的,     // 那么 ptr-16, ptr-32 就都是对齐的。     //     // 这样即使 pDst (行首) 不对齐,只要 pDst + width 对齐,也可以走优化路径。     // 剩余的不对齐部分会落入最后的 switch case 处理,这是安全的。     // ---------------------------------------------------------------------      // 计算当前行逻辑上的"末尾+1"地址 (也是反向写入的起始地址)     uint8_t* const dst_row_end_addr = dst_ptr_base + width;      // 16字节对齐     bool is_aligned = (((uintptr_t)pSrc | (uintptr_t)srcStep | (uintptr_t)dst_row_end_addr | (uintptr_t)dst_stride_actual) & 0xF) == 0;      if (!is_aligned)     {         // 分支 1: 通用/未对齐路径         // 可以在这里根据 src/dst 对齐情况做更细致的 if/else,         // 但核心都是 load/shuffle/store。         // 这里使用 _mm_loadu_si128 (unaligned) 统一处理,这在现代 CPU 上性能几乎等同于拆分处理。          const uint8_t* sptr = pSrc;         uint8_t* dptr_end = dst_row_end_addr; // 指向当前行尾+1          for (int i = 0; i < height; ++i) {             int w = width;             const uint8_t* srow = sptr;             uint8_t* drow_end = dptr_end;              // 32字节循环 (Loop Unrolling x2)             while (w >= 32) {                 __m128i v0 = _mm_loadu_si128((const __m128i*)(srow));                 __m128i v1 = _mm_loadu_si128((const __m128i*)(srow + 16));                  v0 = _mm_shuffle_epi8(v0, mask_flip_128);                 v1 = _mm_shuffle_epi8(v1, mask_flip_128);                  // 注意:drow_end 是向后退的                 _mm_storeu_si128((__m128i*)(drow_end - 16), v0);                 _mm_storeu_si128((__m128i*)(drow_end - 32), v1);                  srow += 32;                 drow_end -= 32;                 w -= 32;             }              // 尾部处理             process_tail_reversed(srow, drow_end, w, mask_flip_128, mask_flip_64);              sptr += srcStep;             dptr_end += dst_stride_actual; // 移动到下一行         }          return dst_stride_actual;     }      // ---------------------------------------------------------------------     // 检查 2: 大数据量 Streaming 检查     // ---------------------------------------------------------------------     int64_t total_size = (int64_t)(srcStep + dstStep) * height;     if (total_size > 0x100000)     {         // 分支 2: 流式写入路径 (Streaming Stores)         const uint8_t* sptr = pSrc;         uint8_t* dptr_end = dst_row_end_addr;          for (int i = 0; i < height; ++i) {             int w = width;             const uint8_t* srow = sptr;             uint8_t* drow_end = dptr_end;              // 128字节循环 (8x16)             while (w >= 128) {                 // 流水线加载                 __m128i v0 = _mm_load_si128((const __m128i*)(srow + 0));                 __m128i v1 = _mm_load_si128((const __m128i*)(srow + 16));                 __m128i v2 = _mm_load_si128((const __m128i*)(srow + 32));                 __m128i v3 = _mm_load_si128((const __m128i*)(srow + 48));                 __m128i v4 = _mm_load_si128((const __m128i*)(srow + 64));                 __m128i v5 = _mm_load_si128((const __m128i*)(srow + 80));                 __m128i v6 = _mm_load_si128((const __m128i*)(srow + 96));                 __m128i v7 = _mm_load_si128((const __m128i*)(srow + 112));                  // 翻转                 v0 = _mm_shuffle_epi8(v0, mask_flip_128);                 v1 = _mm_shuffle_epi8(v1, mask_flip_128);                 v2 = _mm_shuffle_epi8(v2, mask_flip_128);                 v3 = _mm_shuffle_epi8(v3, mask_flip_128);                 v4 = _mm_shuffle_epi8(v4, mask_flip_128);                 v5 = _mm_shuffle_epi8(v5, mask_flip_128);                 v6 = _mm_shuffle_epi8(v6, mask_flip_128);                 v7 = _mm_shuffle_epi8(v7, mask_flip_128);                  // 流式写入 (绕过缓存)                 _mm_stream_si128((__m128i*)(drow_end - 16), v0);                 _mm_stream_si128((__m128i*)(drow_end - 32), v1);                 _mm_stream_si128((__m128i*)(drow_end - 48), v2);                 _mm_stream_si128((__m128i*)(drow_end - 64), v3);                 _mm_stream_si128((__m128i*)(drow_end - 80), v4);                 _mm_stream_si128((__m128i*)(drow_end - 96), v5);                 _mm_stream_si128((__m128i*)(drow_end - 112), v6);                 _mm_stream_si128((__m128i*)(drow_end - 128), v7);                  srow += 128;                 drow_end -= 128;                 w -= 128;             }              // 32字节循环             while (w >= 32) {                 __m128i v0 = _mm_load_si128((const __m128i*)(srow));                 __m128i v1 = _mm_load_si128((const __m128i*)(srow + 16));                  v0 = _mm_shuffle_epi8(v0, mask_flip_128);                 v1 = _mm_shuffle_epi8(v1, mask_flip_128);                  _mm_stream_si128((__m128i*)(drow_end - 16), v0);                 _mm_stream_si128((__m128i*)(drow_end - 32), v1);                  srow += 32;                 drow_end -= 32;                 w -= 32;             }              // 尾部处理             process_tail_reversed(srow, drow_end, w, mask_flip_128, mask_flip_64);              sptr += srcStep;             dptr_end += dst_stride_actual;         }         _mm_sfence(); // 保证 stream store 完成         return dst_stride_actual;     }      // ---------------------------------------------------------------------     // 分支 3: 已对齐且小数据量 (Aligned Cache-Friendly)     // ---------------------------------------------------------------------     if (width > 31)     {         const uint8_t* sptr = pSrc;         uint8_t* dptr_end = dst_row_end_addr;          for (int i = 0; i < height; ++i) {             int w = width;             const uint8_t* srow = sptr;             uint8_t* drow_end = dptr_end;              // 96 字节循环 (3x32)             while (w >= 96) {                 // Load 32 * 3                 __m128i v0 = _mm_load_si128((const __m128i*)(srow));                 __m128i v1 = _mm_load_si128((const __m128i*)(srow + 16));                 __m128i v2 = _mm_load_si128((const __m128i*)(srow + 32));                 __m128i v3 = _mm_load_si128((const __m128i*)(srow + 48));                 __m128i v4 = _mm_load_si128((const __m128i*)(srow + 64));                 __m128i v5 = _mm_load_si128((const __m128i*)(srow + 80));                  // Store 32 * 3                 _mm_store_si128((__m128i*)(drow_end - 16), _mm_shuffle_epi8(v0, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 32), _mm_shuffle_epi8(v1, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 48), _mm_shuffle_epi8(v2, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 64), _mm_shuffle_epi8(v3, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 80), _mm_shuffle_epi8(v4, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 96), _mm_shuffle_epi8(v5, mask_flip_128));                  srow += 96;                 drow_end -= 96;                 w -= 96;             }              // 32 字节循环             while (w >= 32) {                 __m128i v0 = _mm_load_si128((const __m128i*)(srow));                 __m128i v1 = _mm_load_si128((const __m128i*)(srow + 16));                 _mm_store_si128((__m128i*)(drow_end - 16), _mm_shuffle_epi8(v0, mask_flip_128));                 _mm_store_si128((__m128i*)(drow_end - 32), _mm_shuffle_epi8(v1, mask_flip_128));                 srow += 32; drow_end -= 32; w -= 32;             }              // 尾部处理             process_tail_reversed(srow, drow_end, w, mask_flip_128, mask_flip_64);              sptr += srcStep;             dptr_end += dst_stride_actual;         }         return dst_stride_actual;     }      // 最后的 fallback (如果 aligned 但 width 很小 < 32)     {         const uint8_t* sptr = pSrc;         uint8_t* dptr_end = dst_row_end_addr;          for (int i = 0; i < height; ++i) {             int w = width;             const uint8_t* srow = sptr;             uint8_t* drow_end = dptr_end;              // 尾部处理             process_tail_reversed(srow, drow_end, w, mask_flip_128, mask_flip_64);              sptr += srcStep;             dptr_end += dst_stride_actual;         }     }      return dst_stride_actual; } 

功能测试和性能测试

功能测试代码如下

#include <memory> #include <vector> #include <opencv2/opencv.hpp>  #include "ippiflip.h"  // 辅助函数 static bool equalMat(const cv::Mat& a, const cv::Mat& b) {     cv::Mat diff;     cv::absdiff(a, b, diff);     return cv::countNonZero(diff) == 0; }  // 核心测试函数 - 支持src和dst都可以有padding static bool run_case(int rows, int cols,                      int src_pad_bytes, int dst_pad_bytes,                      bool hv,                      const char* desc = nullptr) {     // 创建带padding的源图像     cv::Mat src_full(rows, cols + src_pad_bytes, CV_8UC1);     cv::RNG rng(12345u + rows * 131 + cols * 17 + src_pad_bytes * 7 + dst_pad_bytes * 3);     rng.fill(src_full, cv::RNG::UNIFORM, 0, 256);     cv::Mat src = src_full(cv::Rect(0, 0, cols, rows));      // 创建带padding的目标图像     cv::Mat dst_full(rows, cols + dst_pad_bytes, CV_8UC1, cv::Scalar::all(0));     cv::Mat dst = dst_full(cv::Rect(0, 0, cols, rows));      // OpenCV参考实现     cv::Mat cvref;     const int reverseRows = hv ? 1 : 0;      if (hv)         cv::flip(src, cvref, -1); // H+V     else         cv::flip(src, cvref, 1);  // 仅H      // 调用SIMD实现     int64_t usedStep = ippiMirror_8u_C1_simd(             src.data, static_cast<int>(src.step),             dst.data, static_cast<int>(dst.step),             cols, rows, reverseRows);     (void)usedStep;      // 验证结果     if (!equalMat(dst, cvref)) {         fprintf(stderr, "FAILED: %sn", desc ? desc : "");         fprintf(stderr, "  Size: %dx%d, src_pad=%d, dst_pad=%d, mode=%sn",                 rows, cols, src_pad_bytes, dst_pad_bytes, hv ? "H+V" : "H");         return false;     }     return true; }  // 测试1: 分辨率测试 static int test_typical_resolutions() {     printf("n=== Test Suite 1: Typical Resolutions ===n");      struct Case {         int r, c, src_pad, dst_pad;         const char* desc;     };      std::vector<Case> cases = {             {1080, 1920, 0, 0, "1080p no padding"},             {1080, 1920, 7, 0, "1080p src padded"},             {1080, 1920, 0, 11, "1080p dst padded"},             {1080, 1920, 7, 11, "1080p both padded"},             {480, 641, 3, 5, "480p odd width, both padded"},             {720, 1281, 11, 13, "720p odd width, both padded"},     };      int passed = 0, total = 0;     for (const auto& cs : cases) {         // 测试仅H翻转         total++;         if (run_case(cs.r, cs.c, cs.src_pad, cs.dst_pad, false, cs.desc)) {             passed++;         }          // 测试H+V翻转         total++;         if (run_case(cs.r, cs.c, cs.src_pad, cs.dst_pad, true, cs.desc)) {             passed++;         }     }      printf("Result: %d/%d passedn", passed, total);     return passed == total ? 0 : 1; }   // 测试2: 小尺寸穷举测试 (width <= 64) static int test_small_sizes() {     printf("n=== Test Suite 2: Small Size Exhaustive (w<=64, h<=8) ===n");      int passed = 0, total = 0;      // 测试小宽度 1-64, 小高度 1-8     for (int h = 1; h <= 8; ++h) {         for (int w = 1; w <= 64; ++w) {             // 每种尺寸测试几种padding组合             for (int pad_combo = 0; pad_combo < 3; ++pad_combo) {                 int src_pad = (pad_combo == 0) ? 0 : (w % 7 + 1);                 int dst_pad = (pad_combo == 1) ? 0 : (w % 5 + 1);                  // 测试H翻转                 total++;                 char desc[128];                 snprintf(desc, sizeof(desc), "Small %dx%d", h, w);                 if (run_case(h, w, src_pad, dst_pad, false, desc)) {                     passed++;                 }                  // 测试H+V翻转                 total++;                 if (run_case(h, w, src_pad, dst_pad, true, desc)) {                     passed++;                 }             }         }     }      printf("Result: %d/%d passedn", passed, total);     return passed == total ? 0 : 1; }   // 测试3: 对齐边界测试 (测试关键宽度边界) static int test_alignment_boundaries() {     printf("n=== Test Suite 3: Alignment Boundaries ===n");      int passed = 0, total = 0;      // 关键边界: 16, 31, 32, 63, 64, 127, 128等SIMD相关边界     std::vector<int> critical_widths = {             15, 16, 17,        // 16字节边界             31, 32, 33,        // 32字节边界             63, 64, 65,        // 64字节边界             127, 128, 129,     // 128字节边界             255, 256, 257,     // 256字节边界     };      std::vector<int> heights = {1, 2, 7, 8, 15, 16, 100};      for (int w : critical_widths) {         for (int h : heights) {             // 测试不同padding组合             for (int src_pad = 0; src_pad <= 3; src_pad += 3) {                 for (int dst_pad = 0; dst_pad <= 5; dst_pad += 5) {                     total++;                     char desc[128];                     snprintf(desc, sizeof(desc), "Boundary %dx%d", h, w);                     if (run_case(h, w, src_pad, dst_pad, false, desc)) {                         passed++;                     }                      total++;                     if (run_case(h, w, src_pad, dst_pad, true, desc)) {                         passed++;                     }                 }             }         }     }      printf("Result: %d/%d passedn", passed, total);     return passed == total ? 0 : 1; }   // 测试4: 大尺寸测试 (包含4K) static int test_large_sizes() {     printf("n=== Test Suite 4: Large Sizes (Including 4K) ===n");      struct LargeCase {         int r, c;         const char* desc;     };      std::vector<LargeCase> cases = {             {1080, 1920, "1080p (Full HD)"},             {1440, 2560, "1440p (2K)"},             {2160, 3840, "2160p (4K)"},     };      int passed = 0, total = 0;      for (const auto& cs : cases) {         cv::Mat big(cs.r, cs.c, CV_8UC1);         cv::randu(big, 0, 256);          // 测试H翻转         {             cv::Mat out_icv(cs.r, cs.c, CV_8UC1);             cv::Mat out_cv;              ippiMirror_8u_C1_simd(                     big.data, static_cast<int>(big.step),                     out_icv.data, static_cast<int>(out_icv.step),                     cs.c, cs.r, 0);              cv::flip(big, out_cv, 1);              total++;             if (equalMat(out_icv, out_cv)) {                 passed++;             } else {                 fprintf(stderr, "FAILED: %s H-flipn", cs.desc);             }         }          // 测试H+V翻转         {             cv::Mat out_icv(cs.r, cs.c, CV_8UC1);             cv::Mat out_cv;              ippiMirror_8u_C1_simd(                     big.data, static_cast<int>(big.step),                     out_icv.data, static_cast<int>(out_icv.step),                     cs.c, cs.r, 1);              cv::flip(big, out_cv, -1);              total++;             if (equalMat(out_icv, out_cv)) {                 passed++;             } else {                 fprintf(stderr, "FAILED: %s H+V-flipn", cs.desc);             }         }     }      printf("Result: %d/%d passedn", passed, total);     return passed == total ? 0 : 1; }   // 测试5: 随机压力测试 static int test_random_stress() {     printf("n=== Test Suite 5: Random Stress Test ===n");      int passed = 0, total = 0;     const int num_tests = 100;      cv::RNG rng(0xDEADBEEF);      for (int i = 0; i < num_tests; ++i) {         // 随机生成测试参数         int w = rng.uniform(1, 2048);         int h = rng.uniform(1, 1024);         int src_pad = rng.uniform(0, 32);         int dst_pad = rng.uniform(0, 32);         bool hv = (rng.uniform(0, 2) == 1);          total++;         char desc[128];         snprintf(desc, sizeof(desc), "Random #%d: %dx%d", i, h, w);         if (run_case(h, w, src_pad, dst_pad, hv, desc)) {             passed++;         }     }      printf("Result: %d/%d passedn", passed, total);     return passed == total ? 0 : 1; }   int main() {     printf("SIMD Level: %d (0=none, 1=SSSE3, 2=AVX2)n", detect_simd_level());      int failed_suites = 0;      // 运行所有测试     failed_suites += test_typical_resolutions();     failed_suites += test_small_sizes();     failed_suites += test_alignment_boundaries();     failed_suites += test_large_sizes();     failed_suites += test_random_stress();      // 总结     printf("n");     if (failed_suites == 0) {         printf("ALL TESTS PASSEDn");     } else {         printf("%d TEST SUITE(S) FAILEDn", failed_suites);     }      return failed_suites == 0 ? 0 : 1; } 

功能测试全部通过

性能测试部分代码如下

static void BM_ICV_1024_Horizontal(benchmark::State& state) {     cv::Mat src = createBenchmarkImage(1024);     cv::Mat dst = src.clone();      for (auto _ : state) {         ippiMirror_8u_C1_simd(             src.data,             static_cast<int>(src.step),             dst.data,             static_cast<int>(dst.step),             src.cols,             src.rows,             0  // reverseRows=0: 仅水平翻转         );         benchmark::DoNotOptimize(dst.data);     }      state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *                            src.total() * src.elemSize()); }  static void BM_OpenCV_1024_Horizontal(benchmark::State& state) {     cv::Mat src = createBenchmarkImage(1024);     cv::Mat dst;      for (auto _ : state) {         cv::flip(src, dst, 1);  // flipCode=1: 水平翻转         benchmark::DoNotOptimize(dst.data);     }      state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *                            src.total() * src.elemSize()); } 

性能测试结果如下

BM_ICV_1024_Horizontal         28117 ns        27832 ns        23579 bytes_per_second=35.0878Gi/s BM_OpenCV_1024_Horizontal      28629 ns        29157 ns        23579 bytes_per_second=33.4929Gi/s BM_ICV_1024_Both               30761 ns        30762 ns        24889 bytes_per_second=31.7462Gi/s BM_OpenCV_1024_Both            29216 ns        29157 ns        23579 bytes_per_second=33.4929Gi/s BM_ICV_2048_Horizontal        129590 ns       131138 ns         5600 bytes_per_second=29.7872Gi/s BM_OpenCV_2048_Horizontal     127283 ns       125558 ns         5600 bytes_per_second=31.1111Gi/s BM_ICV_2048_Both              127280 ns       128348 ns         5600 bytes_per_second=30.4348Gi/s BM_OpenCV_2048_Both           131635 ns       131138 ns         5600 bytes_per_second=29.7872Gi/s 

这里挑选了一个测试结果。另外在正式执行BM_ICV_1024_Horizontal之前先进行了BM_OpenCV_1024_Horizontal热身,因为发现第一项测试总是只有20+Gi/s。

实际性能测试中互有胜负,且差距很小。由此可见,可以认为成功复现了该算法。

总结

通过对 ippicv 中 owniFlipCopy_8u_C1 函数的复现,我们深入探索了 SIMD 在图像处理优化方面的技术实践。
真正的高性能代码需要综合考虑 CPU 架构、缓存层次、内存带宽、指令延迟等多个因素。

Streaming Stores 流式存储

主要作用如下:

  1. 避免 RFO,减少无意义的目的缓冲区读取,直接省带宽

在之前自己实现过一版图像翻转SSE,但发现被内存带宽所限制。这次复现后,对这个问题也有了更清晰的了解。

对于图像翻转这种操作,计算密度极低(只做简单的 shuffle),内存读写密度极高。CPU 的运算速度远快于内存供数速度,因此它是一个典型的 Memory-Bound(内存受限) 算法。

内存带宽瓶颈正是代码中引入 Streaming Stores(流式存储)的最主要原因,在这里,减少数据访存延时至关重要。

在普通的内存写入中,CPU 遵循 RFO (Read-For-Ownership) ,可以简单概括为:

CPU 想要写一个 cache line。 CPU 先从内存把这块旧数据读进缓存(占用带宽)(实际可以读其他缓存,但这里重点是内存)。 修改缓存中的数据。 稍后将脏数据写回内存。 

而 Streaming Store (Non-temporal Store):

CPU 告诉内存控制器:“我要覆盖这整块数据,别管旧数据了”。 CPU 跳过读取步骤,直接将 Write Combining Buffer 中的数据刷入内存。 

省掉了一次无效的内存读取带宽。

要真正避免 RFO、发挥带宽优势,通常需要满足:

  • 按 cache line 对齐(比如 64B 对齐);
  • 写入模式是密集连续的,让 write-combining buffer 可以凑满一整行再写出;
  • 不要混杂频繁访问同一行的小写/读,不然硬件可能仍然需要获取这行数据。
  1. 减少缓存污染,把 cache 留给真正需要复用的数据

普通 store 会把写入的数据放进 L1/L2 cache,但对这段代码来说,dst 写完就行了,后面不会再读同一块 dst

再次读取一般是后面的处理 pipeline,而不是这一次函数调用

  1. 利用写合并缓冲区,让连续的大块写更高效,提升内存吞吐
  • 多次 _mm_stream_si128(16B)写向同一 cache line 区域
  • 硬件在 WC buffer 里先合成完整/半完整的 cache line
  • 再一次性写出

最终减少总线事务数量、提高有效带宽

特点

反向写入设计:行尾向行首写入,只要行尾对齐,就能保证连续的 SIMD 写入是对齐的,从而巧妙规避了目标首地址(pDst)不对齐带来的性能损耗,最大化了 Aligned Store 指令的覆盖率。
核心算子优化:利用 SSSE3 的 _mm_shuffle_epi8 替代位运算,实现寄存器内字节乱序/翻转。
指令级并行:通过 128/32 字节的多级循环展开(Loop Unrolling)和批量加载,掩盖指令延迟,充分利用 CPU 流水线性能。
流式写入:针对大数据量场景,使用 _mm_stream_si128 绕过缓存直接写入内存,减少了数据访存延时,提升了内存带宽利用率,显著提升吞吐量。
Switch-Fallthrough 模式:利用 Switch-Case 的穿透特性(Fallthrough)处理 1~7 字节的尾部数据,避免了复杂的条件判断分支。

发表评论

评论已关闭。

相关文章