普通ThreadPoolはRunnable とThreadの関係が切り離されている。そこがオーバーヘッドになるんじゃないかと思って、ThreadとRunnableが1対1に対応していて、複数のスレッドが、それぞれのRunnableをバリア同期を取りながら繰り返し実行する物を書いてみた。
使い方はこんな感じ。tick() とやると、スレッドが一気に動いて、全部終了したところで戻ってくる。
ThreadBundle tb = new ThreadBundle();
for (int i = 0; i < numThreads; i++)
tb.addRunnable(new Runnable() {
public void run() {
System.out.print("-");
System.out.flush();
try {Thread.sleep(1000);} catch (InterruptedException e) { }
}
});
tb.start();
for (int i = 0; i < 100; i++)
tb.tick();
tb.shutdown();
実装は、ReadWriteLockを2つ使って実現している。ワーカスレッドはreadlock を取得しようとしていて、ブロックしている。マスタスレッドがtick でwriteLockをリリースすると、一斉に走る。
一方、ワーカスレッドは裏のreadlockを取得したまま実行しており、これを実行終了後にリリースする。全てのワーカスレッドが裏のreadlockをリリースすると、マスタスレッドがwriteLockを取得できる。これをもって、バリア同期が実現される。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ThreadBundle {
List<Runnable> runnables = new ArrayList<Runnable>();
boolean started =false;
Thread [] threads;
ReadWriteLock [] rwlocks = new ReadWriteLock [] {new ReentrantReadWriteLock(), new ReentrantReadWriteLock()};
CountDownLatch latch;
class WrapperRunnable implements Runnable {
Runnable inner;
int ic = 0;
WrapperRunnable(Runnable inner) {
this.inner = inner;
}
@Override
public void run() {
rwlocks[(ic + 1 )% 2 ].readLock().lock();
latch.countDown();
while (true) {
//System.out.println("try readLock " + (ic %2));
rwlocks[ic % 2].readLock().lock();
//System.out.print("-");
//System.out.flush();
inner.run();
rwlocks[(ic+1) % 2].readLock().unlock();
ic++;
}
}
}
public void ThreadBundle() {
}
public void addRunnable(Runnable r) {
if (started)
throw new RuntimeException("already started");
runnables.add(r);
}
public void start() throws InterruptedException {
started = true;
threads = new Thread[runnables.size()];
latch = new CountDownLatch(runnables.size());
for (int i = 0; i < runnables.size(); i++)
threads[i] = new Thread(new WrapperRunnable (runnables.get(i)));
rwlocks[0].writeLock().lock();
for (int i = 0; i < runnables.size(); i++)
threads[i].start();
latch.await();
}
int counter = 0;
public void tick() {
rwlocks[counter % 2].writeLock().unlock();
rwlocks[(counter + 1) % 2].writeLock().lock();
counter++;
}
public void shutdown() {
// 手抜き
}
}
結構凝った仕掛けなのに、あんまり速くない?残念。
0 件のコメント:
コメントを投稿