2014年10月18日土曜日

Javaの軽量スレッドプール的なもの

普通ThreadPoolはRunnable とThreadの関係が切り離されている。そこがオーバーヘッドになるんじゃないかと思って、ThreadとRunnableが1対1に対応していて、複数のスレッドが、それぞれのRunnableをバリア同期を取りながら繰り返し実行する物を書いてみた。
使い方はこんな感じ。tick() とやると、スレッドが一気に動いて、全部終了したところで戻ってくる。
ThreadBundle tb = new ThreadBundle();
 for (int i = 0; i < numThreads; i++) 
  tb.addRunnable(new Runnable() {
     public void run() {
         System.out.print("-");
         System.out.flush();
        try {Thread.sleep(1000);} catch (InterruptedException e) {     }
     }
  });
tb.start();
for (int i = 0; i < 100; i++) 
     tb.tick();
tb.shutdown();
実装は、ReadWriteLockを2つ使って実現している。ワーカスレッドはreadlock を取得しようとしていて、ブロックしている。マスタスレッドがtick でwriteLockをリリースすると、一斉に走る。
一方、ワーカスレッドは裏のreadlockを取得したまま実行しており、これを実行終了後にリリースする。全てのワーカスレッドが裏のreadlockをリリースすると、マスタスレッドがwriteLockを取得できる。これをもって、バリア同期が実現される。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ThreadBundle {
 List<Runnable> runnables = new ArrayList<Runnable>();
 boolean started =false;
 Thread [] threads;
 ReadWriteLock [] rwlocks = new ReadWriteLock [] {new ReentrantReadWriteLock(), new ReentrantReadWriteLock()};
 CountDownLatch latch;

 class WrapperRunnable implements Runnable {
  Runnable inner;
  int ic = 0;

  WrapperRunnable(Runnable inner) {
   this.inner = inner;
  }

  @Override
  public void run() {
   rwlocks[(ic + 1 )% 2 ].readLock().lock();
   latch.countDown();
   while (true) {
    //System.out.println("try readLock " + (ic %2));
    rwlocks[ic % 2].readLock().lock();
    //System.out.print("-");
    //System.out.flush();
    inner.run();
    rwlocks[(ic+1) % 2].readLock().unlock();
    ic++;
   }
  }
 }

 public void ThreadBundle() {
 }

 public void addRunnable(Runnable r) {
  if (started)
   throw new RuntimeException("already started");
  runnables.add(r);
 }

 public void start() throws InterruptedException {
  started = true;
  threads = new Thread[runnables.size()];
  latch = new CountDownLatch(runnables.size());   
  for (int i = 0; i < runnables.size(); i++)
   threads[i] = new Thread(new WrapperRunnable (runnables.get(i))); 

  rwlocks[0].writeLock().lock();
  for (int i = 0; i < runnables.size(); i++)
   threads[i].start();
  latch.await();
 }

 int counter = 0;

 public void tick() {
  rwlocks[counter % 2].writeLock().unlock();
  rwlocks[(counter + 1) % 2].writeLock().lock();   
  counter++;
 }

 public void shutdown() {
   // 手抜き  
 }
}
結構凝った仕掛けなのに、あんまり速くない?残念。

2014年10月12日日曜日

CUDA

CUDAがちゃんとインストールできてるのかチェックするために、 ググって見つけた
こちらをコピペしてみたところ、コンパイルできない。。。
調べてみると、CUDAのバージョンが違うのが原因。というか厳密にはサンプルについてくる、Wrapperの仕様が変わったせいらしい。まあ、そういうこともあるだろう。
  • cutilSafeCall -> checkCudaErrors
  • CUTIL_SAFE_CALL -> checkCudaErrors
  • タイマー周りはAPIそのものが変わってる?
    cudaEvent_t を使う。
  • include するファイルも変更
#include <stdio.h>
#include <stdlib.h>
#include <helper_cuda.h>
#include <helper_functions.h>

#define MATRIX_SIZE 1024/*行列1辺の数*/
#define BLOCK_SIZE 16

__global__ void
matrixMul(int* inMatrixA, int* inMatrixB, int* inMatrixC);

int main(int argc, char** argv){
unsigned int matrixSize = sizeof(unsigned int) * MATRIX_SIZE * MATRIX_SIZE;

  int* hMatrixA;
  int* hMatrixB;
  int* hMatrixC;
  hMatrixA = (int*)malloc(matrixSize);
  hMatrixB = (int*)malloc(matrixSize);

/*初期値設定*/
  unsigned int col_idx, row_idx;
  for (col_idx = 0; col_idx < MATRIX_SIZE; col_idx++){
      for (row_idx = 0; row_idx < MATRIX_SIZE; row_idx++){
          hMatrixA[col_idx * MATRIX_SIZE + row_idx] = rand() % (1024*1024);
          hMatrixB[col_idx * MATRIX_SIZE + row_idx] = rand() % (1024*1024);
      }
  }

/*デバイス側の変数設定*/
  int* dMatrixA;
  int* dMatrixB;
  int* dMatrixC;

/*デバイスメモリ領域の確保*/
  checkCudaErrors(cudaMalloc((void**)&dMatrixA, matrixSize));
  checkCudaErrors(cudaMemcpy(dMatrixA, hMatrixA, matrixSize, cudaMemcpyHostToDevice));
  checkCudaErrors(cudaMalloc((void**)&dMatrixB, matrixSize));
  checkCudaErrors(cudaMemcpy(dMatrixB, hMatrixB, matrixSize, cudaMemcpyHostToDevice));
  checkCudaErrors(cudaMalloc((void**)&dMatrixC, matrixSize));

/*ブロックサイズとグリッドサイズの設定*/
  dim3 block(BLOCK_SIZE, BLOCK_SIZE);
  dim3 grid(MATRIX_SIZE/BLOCK_SIZE, MATRIX_SIZE/BLOCK_SIZE);

/*タイマーを作成して計測開始*/
  cudaevent_t start;
  cudaEvent_t stop;
  checkCudaErrors(cudaEventCreate(&start));
  checkCudaErrors(cudaEventCreate(&stop));

  checkCudaErrors(cudaEventRecord(start, NULL)); // スタート

/*カーネルの起動*/
  matrixMul<<<grid, block>>>(dMatrixA, dMatrixB, dMatrixC);
  cudaThreadSynchronize();

/*結果の領域確保とデバイス側からのメモリ転送*/
  hMatrixC = (int*)malloc(matrixSize);
  checkCudaErrors(cudaMemcpy(hMatrixC, dMatrixC, matrixSize, cudaMemcpyDeviceToHost));

/*タイマーを停止しかかった時間を表示*/

  checkCudaErrors(cudaEventRecord(stop, NULL));
  checkCudaErrors(cudaEventSynchronize(stop));

  float msecTotal = 0.0f;
  checkCudaErrors(cudaEventElapsedTime(&msecTotal, start, stop));

  printf("Processing time: %f (msec)\n", msecTotal);

/*ホスト・デバイスメモリの開放*/
  free(hMatrixA);
  free(hMatrixB);
  free(hMatrixC);
  checkCudaErrors(cudaFree(dMatrixA));
  checkCudaErrors(cudaFree(dMatrixB));
  checkCudaErrors(cudaFree(dMatrixC));

/*終了処理*/
  cudaThreadExit();
  exit(1);
}

__global__ void
matrixMul(int* inMatrixA, int* inMatrixB, int* inMatrixC){
  unsigned int col_idx = blockIdx.x * blockDim.x + threadIdx.x;
  unsigned int row_idx = blockIdx.y * blockDim.y + threadIdx.y;
  unsigned int scan_idx;
  unsigned int target = 0;

/*行列の演算を行う*/
 for (scan_idx = 0; scan_idx < MATRIX_SIZE; scan_idx++) {
   target +=inMatrixA[col_idx * MATRIX_SIZE + scan_idx] * inMatrixB[scan_idx * MATRIX_SIZE + row_idx];
   __syncthreads();
 }
 inMatrixC[col_idx * MATRIX_SIZE + row_idx] = target;
}