2014年10月18日土曜日

Javaの軽量スレッドプール的なもの

普通ThreadPoolはRunnable とThreadの関係が切り離されている。そこがオーバーヘッドになるんじゃないかと思って、ThreadとRunnableが1対1に対応していて、複数のスレッドが、それぞれのRunnableをバリア同期を取りながら繰り返し実行する物を書いてみた。
使い方はこんな感じ。tick() とやると、スレッドが一気に動いて、全部終了したところで戻ってくる。
ThreadBundle tb = new ThreadBundle();
 for (int i = 0; i < numThreads; i++) 
  tb.addRunnable(new Runnable() {
     public void run() {
         System.out.print("-");
         System.out.flush();
        try {Thread.sleep(1000);} catch (InterruptedException e) {     }
     }
  });
tb.start();
for (int i = 0; i < 100; i++) 
     tb.tick();
tb.shutdown();
実装は、ReadWriteLockを2つ使って実現している。ワーカスレッドはreadlock を取得しようとしていて、ブロックしている。マスタスレッドがtick でwriteLockをリリースすると、一斉に走る。
一方、ワーカスレッドは裏のreadlockを取得したまま実行しており、これを実行終了後にリリースする。全てのワーカスレッドが裏のreadlockをリリースすると、マスタスレッドがwriteLockを取得できる。これをもって、バリア同期が実現される。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ThreadBundle {
 List<Runnable> runnables = new ArrayList<Runnable>();
 boolean started =false;
 Thread [] threads;
 ReadWriteLock [] rwlocks = new ReadWriteLock [] {new ReentrantReadWriteLock(), new ReentrantReadWriteLock()};
 CountDownLatch latch;

 class WrapperRunnable implements Runnable {
  Runnable inner;
  int ic = 0;

  WrapperRunnable(Runnable inner) {
   this.inner = inner;
  }

  @Override
  public void run() {
   rwlocks[(ic + 1 )% 2 ].readLock().lock();
   latch.countDown();
   while (true) {
    //System.out.println("try readLock " + (ic %2));
    rwlocks[ic % 2].readLock().lock();
    //System.out.print("-");
    //System.out.flush();
    inner.run();
    rwlocks[(ic+1) % 2].readLock().unlock();
    ic++;
   }
  }
 }

 public void ThreadBundle() {
 }

 public void addRunnable(Runnable r) {
  if (started)
   throw new RuntimeException("already started");
  runnables.add(r);
 }

 public void start() throws InterruptedException {
  started = true;
  threads = new Thread[runnables.size()];
  latch = new CountDownLatch(runnables.size());   
  for (int i = 0; i < runnables.size(); i++)
   threads[i] = new Thread(new WrapperRunnable (runnables.get(i))); 

  rwlocks[0].writeLock().lock();
  for (int i = 0; i < runnables.size(); i++)
   threads[i].start();
  latch.await();
 }

 int counter = 0;

 public void tick() {
  rwlocks[counter % 2].writeLock().unlock();
  rwlocks[(counter + 1) % 2].writeLock().lock();   
  counter++;
 }

 public void shutdown() {
   // 手抜き  
 }
}
結構凝った仕掛けなのに、あんまり速くない?残念。

0 件のコメント: