2011年3月16日水曜日

Hadoop のHDFS上のファイルを読む

サイドファイルにObjectOutputStreamで書いたものはObjectInputStreamで読めばいい。 dirname 以下のfilterにひっかかるファイルからオブジェクトを読み出すのはこうする。
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf);
  for (FileStatus status: fs.listStatus(new Path(dirname), filter)) {
   InputStream is = fs.open(status.getPath());
   ObjectInputStream ois = new ObjectInputStream(is); 
   Object o = ois.readObject();
   System.out.println(o);
  }
filter はこんな風に取得。filePrefix に指定した文字を含むファイルを選ぶ。この実装だとprefix になってないけど。
 static PathFilter getPathFilter(final String filePrefix){
  PathFilter filter = new PathFilter() {
   public boolean accept(Path path) {
    return path.getName().contains(filePrefix);
   }
  };
  return filter;
 }

普通に書き出したファイルの場合

サイドデータじゃなくて、普通にcontext.writeした場合は読み方が変わってくる。SequenceFile.Readerで読む。
  Configuration conf = new Configuration();

  FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf);
  for (FileStatus status: fs.listStatus(new Path(dirname), filter)) {
   SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
   Text key = new Text();
                        Text value = new Text();
   while (reader.next(key, value){
                               ....
   }
  }
この読み方は、ValueがWritableの時にしか使えない。Serializable の場合はつぎのようにする。 nextでkeyだけ読んで、getCurrentValueでvalueを読む。このときにconfにJavaSerializationを追加しておかないと、エラーになるので注意。
  Configuration conf = new Configuration();
  conf.set("io.serializations", 
    JavaSerialization.class.getName() + "," +
    WritableSerialization.class.getName());

  FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf);
  for (FileStatus status: fs.listStatus(new Path(dirname), filter)) {
   SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
   Text key = new Text();
   while (reader.next(key)){
    System.out.println(key.toString());
    ChlacTest.Result res = new ChlacTest.Result();
    res = (Result) reader.getCurrentValue(res);
    System.out.println(String.format("%d %d %f", res.rx, res.time_frame, res.alpha));
   }
  }

0 件のコメント: