自己紹介

太田一樹。
東京の大学の情報科学科に通う大学生。moratorium満喫中。

お勧め書籍 [全部見る]

飾り

Search


Category Archives

Recent Entries

  1. 論文
  2. JJUG CCCでプレゼンします
  3. kzk's bookshelf
  4. En Google by Gulfweed
  5. PNUTS
  6. コメントスパム対策
  7. Hadoop + Luceneで分散インデクシング
  8. Hadoopの解析資料
  9. Cluster 2008
  10. SWoPP 2008

2007年01月09日

Thread Base MapReduce

並列計算フレームワークを作っている人を見てたら自分もなんか作りたくなって来たので、スレッドベースでGoogleのMapReduceを真似て見ました。1マシン用のMapReduceといった所ですかね。

以下にソースコードが有ります。適当に煮るなり焼くなりしてください。

ソースコード

ワードカウントが以下のようなコードで記述できます。

class WordCounter : public Mapper {
public:
  virtual void Map(const MapInput& input) {
    string text = input.value();
    istringstream iss(text);
    string word;
    while (iss >> word) {
      Emit(word, "1");
    }
  }
};

class Adder : public Reducer {
public:
  virtual void Reduce(ReduceInput *input) {
    // Iterate over all entries with the same key and add the values
    string key = input->get_key();
    unsigned long long int value = 0;
    while (!input->done()) {
      value += atoi(input->value().c_str());
      input->NextValue();
    }

    Emit(key, toString(value));
  }
};

int main(void)
{
  MapReduceSpecification spec;

  MapReduceInput *input;
  input = spec.add_input();
  input->set_input_file("main.cpp");
  input->set_mapper_class(new WordCounter);
  input = spec.add_input();
  input->set_input_file("mapreduce.h");
  input->set_mapper_class(new WordCounter);
  input = spec.add_input();
  input->set_input_file("semqueue.h");
  input->set_mapper_class(new WordCounter);

  MapReduceOutput *output = spec.output();
  output->set_reducer_class(new Adder);

  spec.set_threads(20);

  if (!MapReduce(spec))
    cerr << "MapReduce failed" << endl;
}


肝心の処理はmapreduce.cppにあります。do_map, do_reduce関数でそれぞれmap, reduceの処理を行います。双方共にタスクシステムの形になっています。まずQueueにタスクを挿入します。次にスレッドを立て、各スレッドはそのQueueからタスクを取得し、Queueが空になるまで処理をし続けるという流れになっています。


static void*
map_thread(void *arg)
{
  SemQueue<MapReduceInput*> *inputQueue
    = reinterpret_cast<SemQueue<MapReduceInput*> *>(arg);

  // Consume MapReduceInput Buffer
  while (!inputQueue->empty()) {
    MapReduceInput *input = inputQueue->front();
    inputQueue->pop();

    string input_file = input->get_input_file();
    string file_content = read_file(input_file);

    MapInput map_input(file_content);
    Mapper *mapper = input->get_mapper_class();
    mapper->Map(map_input);
  }

  return NULL;
}


MapperでEmitされたkey, valueのpairは一旦KeyMapというグローバルなstd::mapに突っ込まれます。こうする事でキー毎にソートされた状態でvalueを格納できます。KeyMapは言うなれば中間結果で、この結果をReducerに渡します。


static void
do_reduce(const MapReduceSpecification& spec, KeyMap *keyMap)
{
  SemQueue<ReduceInput*> *reduceInputQueue = new SemQueue<ReduceInput*>();

  const map<string, ReduceInput*>& m = keyMap->get_raw_map();
  map<string, ReduceInput*>::const_iterator it  = m.begin();
  map<string, ReduceInput*>::const_iterator end = m.end();
  for (; it != end; ++it)
    reduceInputQueue->push((*it).second);

  ReduceInfo *info = new ReduceInfo();
  info->inputQueue = reduceInputQueue;
  info->output = spec.output();

  pthread_t* ths = new pthread_t[spec.get_num_threads()];
  for (unsigned int i = 0; i < spec.get_num_threads(); i++)
    pthread_create(&ths[i], NULL, reduce_thread, info);
  for (unsigned int i = 0; i < spec.get_num_threads(); i++)
    pthread_join(ths[i], NULL);
  delete[] ths;

  delete info;
  delete reduceInputQueue;
}


今回はキーのソートをstd::mapに頼りましたが、本家MapReduceではキーのソートをする際に要素数が膨大になってしまい1台では処理が追いつかない & メモリに載らないので、分散ソートをしてるのでしょう。

ReducerにはReduceInput*を渡して、同じキー毎に処理を行わせます。


static void*
reduce_thread(void *arg)
{
  ReduceInfo *info = reinterpret_cast<ReduceInfo*>(arg);
  SemQueue<ReduceInput*> *inputQueue = info->inputQueue;
  MapReduceOutput *output = info->output;

  // Consume MapInput Buffer
  while (!inputQueue->empty()) {
    ReduceInput *input = inputQueue->front();
    inputQueue->pop();

    Reducer *reducer = output->get_reducer_class();
    reducer->Reduce(input);

    delete input;
  }

  return NULL;
}


仕組み的にはこんな感じです。実際に使ってみると1台だとIOがボトルネックになってしまう(個々のスレッドが同じディスクをアクセスしに行く)ので、やはり複数台+GFSのようなIOを分散させる仕組みでやる方が圧倒的に威力を発揮するモデルだと思いますね。

後はこのオモチャMapReduceだとMapが完全に終わりきってからReduceを始めていますが、全Mapperが終了する前にReducerを走らせるだとか、遅いMapperを補助するだとか色々スケジューリングのしどころが有るのでその辺も本家はやってるんでしょうな。

tanakh氏によるとGHCにはControl.Parallel.Strategiesというモジュールが有り、これと似た様な事が出来るそうです。コンパイル時にCPU数を指定するとスレッドが立てられて処理を分散してくれるみたいです。さらにpvmを使っているので複数台にも分散できるらしい。言語にこういう機能が組み込まれているのは良いですな。

まぁ一発ネタでした。メニーコアCPU + クラスタを使いこなす技術をなんとかして編み出したい。というか研究でもその辺をやりたいと思っている今日この頃。(どこの研究室行けばいいのかなー)。


trackbacks

trackbackURL:

comments

comment form
comment form