同时使用 Teigha 低级和高级多线程 API

简介

复杂的多线程解决方案可以同时结合使用 Teigha 低级和高级多线程 API。这种灵活性使您能够以跨平台的方式构建强大的多线程解决方案,并能显著简化将第三方多线程解决方案移植到基于 Teigha 的应用程序的过程。

高级 API(由 Teigha Kernel 提供的线程池服务)构建在 Teigha Kernel 低级 API 之上。这些 API 之前已作介绍:

ThreadPool.tx 模块和 ThreadsCounter 单例

加载线程池服务模块后,您无需在每次需要调用其中的内容时都传递 OdRxThreadPoolService 指针:

// Load thread pool module
OdRxThreadPoolServicePtr pThreadPool = ::odrxDynamicLinker()->loadApp(OdThreadPoolModuleName);
if (pThreadPool.isNull())
  throw OdError(eNullPtr); // ThreadPool.tx not found.

// . . .

OdRxThreadPoolService *pThreadPool = odThreadsCounter().getThreadPoolService();

可以调用 ThreadsCounter 单例的 getThreadPoolService() 方法来访问代码中任何位置的已加载模块接口。只要线程池服务模块处于活动状态且未卸载,此指针就可用。

注册外部线程的替代方法

如 Teigha 多线程低级 API 系列文章所述,您始终可以调用 ThreadsCounter::increase 方法在 Teigha 中注册外部线程,并调用 ThreadsCounter::decrease 方法注销外部启动的线程:

odThreadsCounter().increase(1, (unsigned int*)&threadId, threadAttributes);
// . . .
odThreadsCounter().decrease(1, (unsigned int*)&threadId);

或者,您可以使用线程池服务模块的 OdRxThreadPoolService::registerExternalThreads 和 OdRxThreadPoolService::unregisterExternalThreads 方法:

m_pThreadPool->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
// . . .
m_pThreadPool->unregisterExternalThreads(1, (unsigned int*)&threadId);

或者从存储在 Threads Counter 单例中的线程池服务模块中获取:

if (odThreadsCounter().getThreadPoolService())
  odThreadsCounter().getThreadPoolService()->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
// . . .
if (odThreadsCounter().getThreadPoolService())
  odThreadsCounter().getThreadPoolService()->unregisterExternalThreads(1, (unsigned int*)&threadId);

实际上,所有这些方法都执行相同的任务。OdRxThreadPoolService::registerExternalThreads 和 OdRxThreadPoolService::unregisterExternalThreads 方法在内部调用 Threads Counter 单例的 ThreadsCounter::increase 和 ThreadsCounter::decrease 方法。

使用线程池服务模块的优势

在操作系统中,线程是原子对象。当您创建一个新线程时,操作系统需要额外的时间来分配所需的系统资源,并初始化和准备线程以运行。Teigha 线程池服务始终将一些预分配的线程保持在暂停状态以供重用,因此在 Teigha 线程中运行新任务要快得多,因为它们不需要额外的资源分配和线程准备——它们只需取消暂停即可在线程内执行新任务。

OdApcThread 对象

之前的文章系列《Teigha 多线程低级 API》展示了客户端应用程序如何实现 Teigha API 与基于其他 API(通常是第三方 API)的外部线程之间的通信。我们通过运行 Windows API 线程演示了这一点,但这远非一个跨平台解决方案。线程池服务模块已经为操作系统线程提供了跨平台包装器。客户端应用程序可以将 OdApcThread 类的这些对象与任何其他操作系统线程实现一起使用。新线程从池中获取(参见上一节),因此它们在运行线程作业之前不需要长时间的准备。这意味着没有理由在客户端缓存创建的 OdApcThread 对象;这不会带来任何性能提升,因为 OdApcThread 对象已在线程池服务端缓存。

使用 Teigha 线程的简单示例

为了演示在工作示例中使用 OdApcThread 对象,我们将使用 Teigha 多线程低级 API 中的示例。在 Teigha 多线程高级 API 文章中,我们调用了 OdApcQueue 对象来在一组线程中运行单独的任务。作为演示,我们创建一个简单的队列对象来存储一组正在运行的线程,这将帮助我们为一组正在运行的线程提供一个单一的等待方法,因为我们要求只有在所有数据处理完成后才处理多线程任务的结果:

// Simple multithread queue
class SimpleMultiThreadsQueue
{
  OdRxThreadPoolService *m_pThreadPool;
  OdArray<OdApcThreadPtr> m_runningThreads;
  public:
    SimpleMultiThreadsQueue(OdRxThreadPoolService *pThreadPool) : m_pThreadPool(pThreadPool) {}
    ~SimpleMultiThreadsQueue() { wait(); }

    void runNewThread(OdApcEntryPointVoidParam runFcn, OdApcParamType fcnArg, OdUInt32 threadAttributes)
    {
      m_runningThreads.push_back(m_pThreadPool->newThread());
      unsigned int threadId = m_runningThreads.last()->getId();
      m_pThreadPool->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
      m_runningThreads.last()->asyncProcCall(runFcn, fcnArg);
    }
    void wait()
    {
      OdArray<unsigned int, OdMemoryAllocator<unsigned int> > threadIds;
      while (!m_runningThreads.isEmpty())
      {
        m_runningThreads.last()->wait();
        threadIds.push_back(m_runningThreads.last()->getId());
        m_runningThreads.removeLast();
      }
      if (!threadIds.isEmpty())
        m_pThreadPool->unregisterExternalThreads(threadIds.size(), threadIds.getPtr());
    }
};

我们的 SimpleMultiThreadsQueue 类不仅存储正在运行的线程集并提供 wait() 方法,它还调用 OdRxThreadPoolService::registerExternalThreads 和 OdRxThreadPoolService::unregisterExternalThreads 方法来注册和注销我们外部启动的线程。实际上,这就是 OdApcQueue 对象在内部所做的事情。现在我们可以在主示例函数中创建我们的 SimpleMultiThreadsQueue 对象:

// Load thread pool module
OdRxThreadPoolServicePtr pThreadPool = ::odrxDynamicLinker()->loadApp(OdThreadPoolModuleName);
if (pThreadPool.isNull())
  throw OdError(eNullPtr); // ThreadPool.tx not found.

// Create simple windows threads manager
SimpleMultiThreadsQueue mThreadQueue(pThreadPool);

我们还需要对 RenderDbToImageCaller 和 ProcessImageCaller 类进行小幅重新设计,以使用 SimpleMultiThreadsQueue 对象而不是上一篇文章中的 SimpleWinThreadsPool 对象来运行线程:

// Thread running method implementation
class RenderDbToImageCaller : public OdRxObject
{
  OdString m_inputFile;
  OdGiRasterImagePtr *m_pOutputImage;
  RenderDbToImageContext *m_pThreadCtx;
  public:
    RenderDbToImageCaller *setup(OdString inputFile, OdGiRasterImagePtr *pOutputImage, RenderDbToImageContext *pThreadCtx)
    { m_inputFile = inputFile; m_pOutputImage = pOutputImage; m_pThreadCtx = pThreadCtx;
      return this; }
    static void entryPoint(OdApcParamType pArg)
    {
      ::odThreadsCounter().startThread();
      RenderDbToImageCaller *pCaller = (RenderDbToImageCaller*)pArg;
      OdDbDatabasePtr pDb = pCaller->m_pThreadCtx->m_pServices->readFile(pCaller->m_inputFile);
      if (!pDb.isNull())
        *(pCaller->m_pOutputImage) = ::renderDbToImage(pDb, pCaller->m_pThreadCtx->m_pRenderDevice, 
          pCaller->m_pThreadCtx->m_picWidth, pCaller->m_pThreadCtx->m_picHeight);
      ::odThreadsCounter().stopThread();
    }
    RenderDbToImageCaller *run(SimpleMultiThreadsQueue &threadQueue)
    {
      threadQueue.runNewThread(entryPoint, (OdApcParamType)this, 
        ThreadsCounter::kMtLoadingAttributes | ThreadsCounter::kMtRegenAttributes);
      return this;
    }
};

ProcessImageCaller 类内部也进行了相同的更改:

// Thread running method implementation
class ProcessImageCaller : public OdRxObject
{
  OdSmartPtr<ProcessedRasterImage> m_pProcImage;
  OdUInt32 m_scanLineFrom, m_nScanLines;
  public:
    ProcessImageCaller *setup(ProcessedRasterImage *pProcImage, OdUInt32 scanLineFrom, OdUInt32 nScanLines)
    { m_pProcImage = pProcImage; m_scanLineFrom = scanLineFrom; m_nScanLines = nScanLines;
      return this; }
    static void entryPoint(OdApcParamType pArg)
    {
      ::odThreadsCounter().startThread();
      ProcessImageCaller *pCaller = (ProcessImageCaller*)pArg;
      pCaller->m_pProcImage->process(pCaller->m_scanLineFrom, pCaller->m_nScanLines);
      ::odThreadsCounter().stopThread();
    }
    ProcessImageCaller *run(SimpleMultiThreadsQueue &threadQueue)
    {
      threadQueue.runNewThread(entryPoint, (OdApcParamType)this, ThreadsCounter::kNoAttributes);
      return this;
    }
};

与之前一样,我们在线程执行函数中调用 ThreadsCounter::startThread 和 ThreadsCounter::stopThread 方法,以在需要时分配/释放内部 Teigha 每线程资源。本文稍后将描述 entryPoint 方法原型的细微差别。

现在我们可以修改主示例函数,以调用 SimpleMultiThreadsQueue 对象而不是上一篇文章中的 SimpleWinThreadsPool 对象:

// Init performance timer
OdPerfTimerWrapper perfTimer;

// Create "render database to image" context shareable between threads
RenderDbToImageContext renderDbContext;
renderDbContext.setup(OdWinOpenGLModuleName, 1024, 1024, &svcs);

// Locked per-thread data structures
OdRxObjectPtrArray lockedObjects;

// Start timing for "render database to image" process
perfTimer.getTimer()->start();

// Run loading and rendering process
for (OdUInt32 nInput = 0; nInput < generatedRasters.size(); nInput++)
{
  OdString inputFileName(argv[2 + nInput]);
  lockedObjects.push_back(
    OdRxObjectImpl<RenderDbToImageCaller>::createObject()->
      setup(inputFileName, &generatedRasters[nInput], &renderDbContext)->
        run(mThreadQueue));
}

// Wait threads completion
mThreadQueue.wait();
lockedObjects.clear();

// Output timing for "render database to image" process
perfTimer.getTimer()->stop();
odPrintConsoleString(L"%u files loaded and rendered in %f seconds\n", generatedRasters.size(), perfTimer.getTimer()->countedSec());

多线程图像处理部分也进行了相同的更改:

// Create final raster image generator
OdSmartPtr<GeneratedRasterImage> pGenImage = OdRxObjectImpl<GeneratedRasterImage>::createObject();
pGenImage->configureImage(generatedRasters[0], generatedRasters);
    
// Create container for processed final raster image 
OdSmartPtr<ProcessedRasterImage> pProcImage = OdRxObjectImpl<ProcessedRasterImage>::createObject();
pProcImage->configureImage(pGenImage);

// Start timer for measure raster image processing
perfTimer.getTimer()->start();

// Run threads for raster image processing
const OdUInt32 nScanlinesPerThread = pProcImage->pixelHeight() / 4;
for (OdUInt32 nThread = 0; nThread < 4; nThread++)
{
  OdUInt32 nScanlinesPerThisThread = nScanlinesPerThread;
  if (nThread == 3) // Height can be not dividable by 2, so last thread can have onto one scanline less.
    nScanlinesPerThisThread = pProcImage->pixelHeight() - nScanlinesPerThread * 3;
  lockedObjects.push_back(
    OdRxObjectImpl<ProcessImageCaller>::createObject()->
      setup(pProcImage, nScanlinesPerThread * nThread, nScanlinesPerThisThread)->
        run(mThreadQueue));
}

// Wait threads completion
mThreadQueue.wait();
lockedObjects.clear();

// Output measurement for raster image processing process
perfTimer.getTimer()->stop();
odPrintConsoleString(L"Final raster image processed in %f seconds\n", perfTimer.getTimer()->countedSec());

您可以在之前的文章 Teigha Multithreading High-Level API 和 Teigha Multithreading Low-Level API 中获取有关这些代码示例的更多信息。

两种 OdApcThread::asyncProcCall 方法原型之间的区别

void asyncProcCall(OdApcEntryPointVoidParam ep, OdApcParamType parameter)
virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0;

线程入口点(可以作为 ep 参数传递)应如下所示:

void entryPoint(OdApcParamType pArg);

此原型在我们的示例 RenderDbToImageCaller 和 ProcessImageCaller 类中用作静态类成员。

OdApcParamType 在 Teigha API 中定义为:

typedef ptrdiff_t OdApcParamType;

这是一个经典的线程执行函数原型,通常期望一个 void* 参数。这里的 ptrdiff_t 类型与 void* 具有相同的含义,因此您可以为 32 位库配置传递一个 32 位值,或为 64 位库配置传递一个 64 位值。

void asyncProcCall(OdApcEntryPointRxObjParam ep, OdRxObject* parameter)
virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0;

线程入口点(可以作为 ep 参数传递)应如下所示:

void entryPoint(OdRxObject* pArg);

此原型与带有 OdApcParamType 参数的 asyncProcCall 方法有一个显著区别。OdRxObject 类具有引用计数机制,因此基于 OdRxObject 的类将在线程作业完成期间保持活动状态。

此功能正是我们再次简化演示示例所需的,因为 RenderDbToImageCaller 和 ProcessImageCaller 类已经是基于 OdRxObject 的。

调整示例

在我们的示例中,我们尝试使用 OdRxObjects 数组(OdRxObjectPtrArray lockedObjects;)来解决每线程分配对象的问题,该数组会锁定所有每线程数据,直到所有多线程操作完成,然后清除此数组以解锁并释放所有每线程数据。此任务可以使用带有 OdRxObject* 参数的 OdApcThread::asyncProcCall 方法来解决。

首先修改 SimpleMultiThreadsQueue::runNewThread 方法以使用所描述的线程执行函数原型:

void runNewThread(OdApcEntryPointRxObjParam runFcn, OdRxObject *fcnArg, OdUInt32 threadAttributes)

接下来修改 RenderDbToImageCaller 和 ProcessImageCaller 类中的 entryPoint 方法:

static void entryPoint(OdRxObject* pArg)

在 RenderDbToImageCaller::run 方法中,移除线程函数参数到 OdApcParamType 类型的显式类型转换:

threadQueue.runNewThread(entryPoint, this, 
    ThreadsCounter::kMtLoadingAttributes | ThreadsCounter::kMtRegenAttributes);

在 ProcessImageCaller::run 方法中执行相同的操作:

threadQueue.runNewThread(entryPoint, this, ThreadsCounter::kNoAttributes);

现在可以从主示例函数中移除锁定对象数组的创建和清除,并且可以简化运行线程:

OdRxObjectImpl<RenderDbToImageCaller>::createObject()->
  setup(inputFileName, &generatedRasters[nInput], &renderDbContext)->
    run(mThreadQueue);

多线程图像处理运行循环也一样:

OdRxObjectImpl<ProcessImageCaller>::createObject()->
  setup(pProcImage, nScanlinesPerThread * nThread, nScanlinesPerThisThread)->
    run(mThreadQueue);

结论

这是关于 Teigha 多线程 API 的最后一篇文章。这些系列文章共同提供了足够的信息,可用于使用 Teigha 库开始进行基本的多线程编程,根据项目具体情况选择最佳的多线程解决方案组织方式,并为良好的性能和源代码设计找到最佳解决方案。当然,这些文章无法突出所有 API 机会。例如,我们没有描述从嵌套线程(使用低级和高级 API)在主应用程序线程内启动任务执行的可能性,这在某些高度专业化的解决方案中可能很有用。此外,线程池服务模块 API 提供了许多辅助类来简化多线程源代码,例如 for_each 模板集。使用这些扩展功能和这些类需要高级编程经验,可以独立研究。

今天就开始行动

免费试用 ODA 软件 60 天。
无风险,无需信用卡。

免费试用