Teighaの低レベルおよび高レベルマルチスレッドAPIを併用する

はじめに

複雑なマルチスレッドソリューションでは、Teighaの低レベルおよび高レベルマルチスレッドAPIを組み合わせることができます。この柔軟性により、クロスプラットフォームで強力なマルチスレッドソリューションを構築でき、サードパーティのマルチスレッドソリューションをTeighaベースのアプリケーションに移植する作業を大幅に簡素化できます。

高レベルAPI(Teigha Kernelが提供するスレッドプールサービス)は、Teigha Kernelの低レベルAPIの上に構築されています。これらのAPIについては以前に説明しました。

ThreadPool.txモジュールとThreadsCounterシングルトン

スレッドプールサービスモジュールをロードした後、そこから何かを呼び出すたびにOdRxThreadPoolServiceポインタを渡す必要はありません。

// Load thread pool module
OdRxThreadPoolServicePtr pThreadPool = ::odrxDynamicLinker()->loadApp(OdThreadPoolModuleName);
if (pThreadPool.isNull())
  throw OdError(eNullPtr); // ThreadPool.tx not found.

// . . .

OdRxThreadPoolService *pThreadPool = odThreadsCounter().getThreadPoolService();

ThreadsCounterシングルトンのgetThreadPoolService()メソッドを呼び出すことで、コード内の任意の場所でロードされたモジュールインターフェースにアクセスできます。このポインタは、スレッドプールサービスモジュールがアクティブでアンロードされていない間利用可能です。

外部スレッドを登録するための代替方法

Teighaマルチスレッド低レベルAPIの記事シリーズで説明されているように、Teighaで外部スレッドを登録するにはThreadsCounter::increaseメソッドを、外部で開始されたスレッドの登録を解除するにはThreadsCounter::decreaseメソッドを常に呼び出すことができます。

odThreadsCounter().increase(1, (unsigned int*)&threadId, threadAttributes);
// . . .
odThreadsCounter().decrease(1, (unsigned int*)&threadId);

あるいは、スレッドプールサービスモジュールのOdRxThreadPoolService::registerExternalThreadsおよびOdRxThreadPoolService::unregisterExternalThreadsメソッドを使用することもできます。

m_pThreadPool->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
// . . .
m_pThreadPool->unregisterExternalThreads(1, (unsigned int*)&threadId);

または、Threads Counterシングルトン内に格納されているスレッドプールサービスモジュールから。

if (odThreadsCounter().getThreadPoolService())
  odThreadsCounter().getThreadPoolService()->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
// . . .
if (odThreadsCounter().getThreadPoolService())
  odThreadsCounter().getThreadPoolService()->unregisterExternalThreads(1, (unsigned int*)&threadId);

実際には、これらのメソッドはすべて同じ処理を行います。OdRxThreadPoolService::registerExternalThreadsおよびOdRxThreadPoolService::unregisterExternalThreadsメソッドは、内部的にThreads CounterシングルトンのThreadsCounter::increaseおよびThreadsCounter::decreaseメソッドを呼び出します。

スレッドプールサービスモジュールを使用する利点

オペレーティングシステムでは、スレッドはアトミックなオブジェクトです。新しいスレッドを作成すると、オペレーティングシステムは必要なシステムリソースを割り当て、スレッドを実行するために初期化および準備するために追加の時間を必要とします。Teighaスレッドプールサービスは、常にいくつかの事前割り当てされたスレッドを一時停止状態で保持し、それらを再利用するため、Teighaスレッドで新しいタスクを実行する方がはるかに高速です。これは、追加のリソース割り当てやスレッドの準備が不要であり、単に一時停止を解除してスレッド内で新しいタスクを実行するだけで済むためです。

OdApcThreadオブジェクト

以前の連載記事「Teighaマルチスレッド低レベルAPI」では、クライアントアプリケーションがTeigha APIと、他のAPI(多くの場合サードパーティAPI)に基づいた外部スレッドとの間の通信をどのように有効にできるかを示しました。これをWindows APIスレッドの実行で実証しましたが、これはクロスプラットフォームソリューションとはかけ離れています。スレッドプールサービスモジュールは、オペレーティングシステムスレッド用のクロスプラットフォームラッパーをすでに提供しています。OdApcThreadクラスのこれらのオブジェクトは、他のどのオペレーティングシステムスレッド実装でもクライアントアプリケーションによって使用できます。新しいスレッドはプールから取得されるため(前セクションを参照)、スレッドジョブを実行する前に長い準備は必要ありません。これは、作成されたOdApcThreadオブジェクトをクライアント側でキャッシュする理由がないことを意味します。OdApcThreadオブジェクトはすでにスレッドプールサービス側でキャッシュされているため、これによりパフォーマンス上の利点が得られることはありません。

Teighaスレッドの使用例

動作する例の中でOdApcThreadオブジェクトを使用する方法を示すために、Teighaマルチスレッド低レベルAPIの例を使用します。Teighaマルチスレッド高レベルAPIの記事では、一連のスレッドで個別のタスクを実行するためにOdApcQueueオブジェクトを呼び出しました。デモンストレーションとして、実行中のスレッドのセットを格納するシンプルなキューオブジェクトを作成します。これにより、すべてのデータ処理が完了したときにのみマルチスレッドタスクの結果を処理する必要があるため、実行中のスレッドのセットに対して単一の待機メソッドを提供できます。

// Simple multithread queue
class SimpleMultiThreadsQueue
{
  OdRxThreadPoolService *m_pThreadPool;
  OdArray<OdApcThreadPtr> m_runningThreads;
  public:
    SimpleMultiThreadsQueue(OdRxThreadPoolService *pThreadPool) : m_pThreadPool(pThreadPool) {}
    ~SimpleMultiThreadsQueue() { wait(); }

    void runNewThread(OdApcEntryPointVoidParam runFcn, OdApcParamType fcnArg, OdUInt32 threadAttributes)
    {
      m_runningThreads.push_back(m_pThreadPool->newThread());
      unsigned int threadId = m_runningThreads.last()->getId();
      m_pThreadPool->registerExternalThreads(1, (unsigned int*)&threadId, threadAttributes);
      m_runningThreads.last()->asyncProcCall(runFcn, fcnArg);
    }
    void wait()
    {
      OdArray<unsigned int, OdMemoryAllocator<unsigned int> > threadIds;
      while (!m_runningThreads.isEmpty())
      {
        m_runningThreads.last()->wait();
        threadIds.push_back(m_runningThreads.last()->getId());
        m_runningThreads.removeLast();
      }
      if (!threadIds.isEmpty())
        m_pThreadPool->unregisterExternalThreads(threadIds.size(), threadIds.getPtr());
    }
};

私たちのSimpleMultiThreadsQueueクラスは、実行中のスレッドのセットを格納し、wait()メソッドを提供するだけでなく、外部で開始されたスレッドを登録および登録解除するために、OdRxThreadPoolService::registerExternalThreadsおよびOdRxThreadPoolService::unregisterExternalThreadsメソッドを呼び出します。実際、これはOdApcQueueオブジェクトが内部で行っていることです。これで、メインの例の関数内にSimpleMultiThreadsQueueオブジェクトを作成できます。

// Load thread pool module
OdRxThreadPoolServicePtr pThreadPool = ::odrxDynamicLinker()->loadApp(OdThreadPoolModuleName);
if (pThreadPool.isNull())
  throw OdError(eNullPtr); // ThreadPool.tx not found.

// Create simple windows threads manager
SimpleMultiThreadsQueue mThreadQueue(pThreadPool);

また、以前の記事のSimpleWinThreadsPoolオブジェクトの代わりにSimpleMultiThreadsQueueオブジェクトを使用してスレッドを実行するために、RenderDbToImageCallerおよびProcessImageCallerクラスに小さな再設計が必要です。

// Thread running method implementation
class RenderDbToImageCaller : public OdRxObject
{
  OdString m_inputFile;
  OdGiRasterImagePtr *m_pOutputImage;
  RenderDbToImageContext *m_pThreadCtx;
  public:
    RenderDbToImageCaller *setup(OdString inputFile, OdGiRasterImagePtr *pOutputImage, RenderDbToImageContext *pThreadCtx)
    { m_inputFile = inputFile; m_pOutputImage = pOutputImage; m_pThreadCtx = pThreadCtx;
      return this; }
    static void entryPoint(OdApcParamType pArg)
    {
      ::odThreadsCounter().startThread();
      RenderDbToImageCaller *pCaller = (RenderDbToImageCaller*)pArg;
      OdDbDatabasePtr pDb = pCaller->m_pThreadCtx->m_pServices->readFile(pCaller->m_inputFile);
      if (!pDb.isNull())
        *(pCaller->m_pOutputImage) = ::renderDbToImage(pDb, pCaller->m_pThreadCtx->m_pRenderDevice, 
          pCaller->m_pThreadCtx->m_picWidth, pCaller->m_pThreadCtx->m_picHeight);
      ::odThreadsCounter().stopThread();
    }
    RenderDbToImageCaller *run(SimpleMultiThreadsQueue &threadQueue)
    {
      threadQueue.runNewThread(entryPoint, (OdApcParamType)this, 
        ThreadsCounter::kMtLoadingAttributes | ThreadsCounter::kMtRegenAttributes);
      return this;
    }
};

ProcessImageCallerクラス内でも同様の変更を行います。

// Thread running method implementation
class ProcessImageCaller : public OdRxObject
{
  OdSmartPtr<ProcessedRasterImage> m_pProcImage;
  OdUInt32 m_scanLineFrom, m_nScanLines;
  public:
    ProcessImageCaller *setup(ProcessedRasterImage *pProcImage, OdUInt32 scanLineFrom, OdUInt32 nScanLines)
    { m_pProcImage = pProcImage; m_scanLineFrom = scanLineFrom; m_nScanLines = nScanLines;
      return this; }
    static void entryPoint(OdApcParamType pArg)
    {
      ::odThreadsCounter().startThread();
      ProcessImageCaller *pCaller = (ProcessImageCaller*)pArg;
      pCaller->m_pProcImage->process(pCaller->m_scanLineFrom, pCaller->m_nScanLines);
      ::odThreadsCounter().stopThread();
    }
    ProcessImageCaller *run(SimpleMultiThreadsQueue &threadQueue)
    {
      threadQueue.runNewThread(entryPoint, (OdApcParamType)this, ThreadsCounter::kNoAttributes);
      return this;
    }
};

以前と同様に、必要に応じて内部のTeighaスレッドごとのリソースを割り当て/解放するために、スレッド実行関数内でThreadsCounter::startThreadおよびThreadsCounter::stopThreadメソッドを呼び出します。entryPointメソッドのプロトタイプにおける小さな違いについては、この記事の後半で説明します。

これで、メインの例の関数を修正して、以前の記事のSimpleWinThreadsPoolオブジェクトの代わりにSimpleMultiThreadsQueueオブジェクトを呼び出すことができます。

// Init performance timer
OdPerfTimerWrapper perfTimer;

// Create "render database to image" context shareable between threads
RenderDbToImageContext renderDbContext;
renderDbContext.setup(OdWinOpenGLModuleName, 1024, 1024, &svcs);

// Locked per-thread data structures
OdRxObjectPtrArray lockedObjects;

// Start timing for "render database to image" process
perfTimer.getTimer()->start();

// Run loading and rendering process
for (OdUInt32 nInput = 0; nInput < generatedRasters.size(); nInput++)
{
  OdString inputFileName(argv[2 + nInput]);
  lockedObjects.push_back(
    OdRxObjectImpl<RenderDbToImageCaller>::createObject()->
      setup(inputFileName, &generatedRasters[nInput], &renderDbContext)->
        run(mThreadQueue));
}

// Wait threads completion
mThreadQueue.wait();
lockedObjects.clear();

// Output timing for "render database to image" process
perfTimer.getTimer()->stop();
odPrintConsoleString(L"%u files loaded and rendered in %f seconds\n", generatedRasters.size(), perfTimer.getTimer()->countedSec());

マルチスレッド画像処理部分にも同様の変更を加えます。

// Create final raster image generator
OdSmartPtr<GeneratedRasterImage> pGenImage = OdRxObjectImpl<GeneratedRasterImage>::createObject();
pGenImage->configureImage(generatedRasters[0], generatedRasters);
    
// Create container for processed final raster image 
OdSmartPtr<ProcessedRasterImage> pProcImage = OdRxObjectImpl<ProcessedRasterImage>::createObject();
pProcImage->configureImage(pGenImage);

// Start timer for measure raster image processing
perfTimer.getTimer()->start();

// Run threads for raster image processing
const OdUInt32 nScanlinesPerThread = pProcImage->pixelHeight() / 4;
for (OdUInt32 nThread = 0; nThread < 4; nThread++)
{
  OdUInt32 nScanlinesPerThisThread = nScanlinesPerThread;
  if (nThread == 3) // Height can be not dividable by 2, so last thread can have onto one scanline less.
    nScanlinesPerThisThread = pProcImage->pixelHeight() - nScanlinesPerThread * 3;
  lockedObjects.push_back(
    OdRxObjectImpl<ProcessImageCaller>::createObject()->
      setup(pProcImage, nScanlinesPerThread * nThread, nScanlinesPerThisThread)->
        run(mThreadQueue));
}

// Wait threads completion
mThreadQueue.wait();
lockedObjects.clear();

// Output measurement for raster image processing process
perfTimer.getTimer()->stop();
odPrintConsoleString(L"Final raster image processed in %f seconds\n", perfTimer.getTimer()->countedSec());

これらのコード例に関する詳細情報は、以前の記事「Teigha Multithreading High-Level API」および「Teigha Multithreading Low-Level API」で確認できます。

2つのOdApcThread::asyncProcCallメソッドプロトタイプの違い

void asyncProcCall(OdApcEntryPointVoidParam ep, OdApcParamType parameter)
virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0;

スレッドのエントリポイント(ep引数として渡すことができます)は次のようになります。

void entryPoint(OdApcParamType pArg);

このプロトタイプは、私たちの例であるRenderDbToImageCallerおよびProcessImageCallerクラスで静的クラスメンバーとして使用されました。

OdApcParamTypeはTeigha APIで次のように定義されています。

typedef ptrdiff_t OdApcParamType;

これは、通常void*引数を期待するスレッド実行関数の典型的なプロトタイプです。ここでptrdiff_t型はvoid*と同じ意味を持つため、32ビットライブラリ構成の場合は32ビット値、64ビットライブラリ構成の場合は64ビット値をスレッドに渡すことができます。

void asyncProcCall(OdApcEntryPointRxObjParam ep, OdRxObject* parameter)
virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0;

スレッドのエントリポイント(ep引数として渡すことができます)は次のようになります。

void entryPoint(OdRxObject* pArg);

このプロトタイプは、OdApcParamType引数を持つasyncProcCallメソッドとは1つの重要な違いがあります。OdRxObjectクラスには参照カウントメカニズムがあるため、OdRxObjectベースのクラスはスレッドジョブの完了まで存続します。

RenderDbToImageCallerおよびProcessImageCallerクラスはすでにOdRxObjectベースであるため、この機能はデモンストレーションの例をもう一度簡素化するために必要なものです。

例の調整

この例では、OdRxObjectの配列(OdRxObjectPtrArray lockedObjects;)を使用してスレッドごとに割り当てられたオブジェクトの問題を解決しようとしています。これは、すべてのマルチスレッド操作が完了するまですべてのスレッドごとのデータをロックし、その後この配列をクリアしてすべてのスレッドごとのデータのロックを解除し、解放します。このタスクは、OdRxObject*引数を持つOdApcThread::asyncProcCallメソッドを使用して解決できます。

まず、SimpleMultiThreadsQueue::runNewThread メソッドを、記述されているスレッド実行関数プロトタイプを使用するように変更します。

void runNewThread(OdApcEntryPointRxObjParam runFcn, OdRxObject *fcnArg, OdUInt32 threadAttributes)

次に、RenderDbToImageCaller および ProcessImageCaller クラスの entryPoint メソッドを変更します。

static void entryPoint(OdRxObject* pArg)

RenderDbToImageCaller::run メソッドで、スレッド関数引数の OdApcParamType 型への明示的なキャストを削除します。

threadQueue.runNewThread(entryPoint, this, 
    ThreadsCounter::kMtLoadingAttributes | ThreadsCounter::kMtRegenAttributes);

ProcessImageCaller::run メソッドでも同様に行います。

threadQueue.runNewThread(entryPoint, this, ThreadsCounter::kNoAttributes);

これで、ロックされたオブジェクト配列の作成とクリアをメインのサンプル関数から削除でき、実行中のスレッドを簡素化できます。

OdRxObjectImpl<RenderDbToImageCaller>::createObject()->
  setup(inputFileName, &generatedRasters[nInput], &renderDbContext)->
    run(mThreadQueue);

マルチスレッド画像処理の実行ループについても同様です。

OdRxObjectImpl<ProcessImageCaller>::createObject()->
  setup(pProcImage, nScanlinesPerThread * nThread, nScanlinesPerThisThread)->
    run(mThreadQueue);

結論

これは Teigha マルチスレッド API に関する最後の記事です。これらの記事シリーズは、Teigha ライブラリを使用した基本的なマルチスレッドプログラミングを開始し、プロジェクトの特性に応じてマルチスレッドソリューションの最適な構成を選択し、優れたパフォーマンスとソースコード設計のための最良のソリューションに到達するための十分な情報を提供します。もちろん、これらの記事はすべての API の機会を強調することはできません。たとえば、ネストされたスレッドからメインアプリケーションスレッド内でタスクの実行を開始する可能性(低レベルおよび高レベル API を使用)については説明しませんでした。これは、一部の高度に専門化されたソリューションで役立つ場合があります。また、スレッドプールサービスモジュール API は、for_each テンプレートのセットなど、マルチスレッドソースコードを簡素化するための多くのヘルパークラスを提供します。この拡張された機能とこれらのクラスを使用するには、高度なプログラミング経験が必要であり、個別に調査することができます。

今すぐ始める

ODAソフトウェアを60日間無料でお試しください。
リスクなし、クレジットカード不要。

無料で試す