Home > Uncategorized > Thread Base MapReduce

Thread Base MapReduce

並列計算フレームワークを作っている人を見てたら自分もなんか作りたくなって来たので、スレッドベースでGoogleのMapReduceを真似て見ました。1マシン用のMapReduceといった所ですかね。

以下にソースコードが有ります。適当に煮るなり焼くなりしてください。
ソースコード
ワードカウントが以下のようなコードで記述できます。
[code]
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
string text = input.value();
istringstream iss(text);
string word;
while (iss >> word) {
Emit(word, “1″);
}
}
};
class Adder : public Reducer {
public:
virtual void Reduce(ReduceInput *input) {
// Iterate over all entries with the same key and add the values
string key = input->get_key();
unsigned long long int value = 0;
while (!input->done()) {
value += atoi(input->value().c_str());
input->NextValue();
}
Emit(key, toString(value));
}
};
int main(void)
{
MapReduceSpecification spec;
MapReduceInput *input;
input = spec.add_input();
input->set_input_file(”main.cpp”);
input->set_mapper_class(new WordCounter);
input = spec.add_input();
input->set_input_file(”mapreduce.h”);
input->set_mapper_class(new WordCounter);
input = spec.add_input();
input->set_input_file(”semqueue.h”);
input->set_mapper_class(new WordCounter);
MapReduceOutput *output = spec.output();
output->set_reducer_class(new Adder);
spec.set_threads(20);
if (!MapReduce(spec))
cerr << "MapReduce failed" << endl;
}
[/code]
肝心の処理はmapreduce.cppにあります。do_map, do_reduce関数でそれぞれmap, reduceの処理を行います。双方共にタスクシステムの形になっています。まずQueueにタスクを挿入します。次にスレッドを立て、各スレッドはそのQueueからタスクを取得し、Queueが空になるまで処理をし続けるという流れになっています。
[code]
static void*
map_thread(void *arg)
{
SemQueue *inputQueue
= reinterpret_cast *>(arg);
// Consume MapReduceInput Buffer
while (!inputQueue->empty()) {
MapReduceInput *input = inputQueue->front();
inputQueue->pop();
string input_file = input->get_input_file();
string file_content = read_file(input_file);
MapInput map_input(file_content);
Mapper *mapper = input->get_mapper_class();
mapper->Map(map_input);
}
return NULL;
}
[/code]
MapperでEmitされたkey, valueのpairは一旦KeyMapというグローバルなstd::mapに突っ込まれます。こうする事でキー毎にソートされた状態でvalueを格納できます。KeyMapは言うなれば中間結果で、この結果をReducerに渡します。
[code]
static void
do_reduce(const MapReduceSpecification& spec, KeyMap *keyMap)
{
SemQueue *reduceInputQueue = new SemQueue();
const map& m = keyMap->get_raw_map();
map::const_iterator it = m.begin();
map::const_iterator end = m.end();
for (; it != end; ++it)
reduceInputQueue->push((*it).second);
ReduceInfo *info = new ReduceInfo();
info->inputQueue = reduceInputQueue;
info->output = spec.output();
pthread_t* ths = new pthread_t[spec.get_num_threads()];
for (unsigned int i = 0; i < spec.get_num_threads(); i++)
pthread_create(&ths[i], NULL, reduce_thread, info);
for (unsigned int i = 0; i < spec.get_num_threads(); i++)
pthread_join(ths[i], NULL);
delete[] ths;
delete info;
delete reduceInputQueue;
}
[/code]
今回はキーのソートをstd::mapに頼りましたが、本家MapReduceではキーのソートをする際に要素数が膨大になってしまい1台では処理が追いつかない & メモリに載らないので、分散ソートをしてるのでしょう。
ReducerにはReduceInput*を渡して、同じキー毎に処理を行わせます。
[code]
static void*
reduce_thread(void *arg)
{
ReduceInfo *info = reinterpret_cast(arg);
SemQueue *inputQueue = info->inputQueue;
MapReduceOutput *output = info->output;
// Consume MapInput Buffer
while (!inputQueue->empty()) {
ReduceInput *input = inputQueue->front();
inputQueue->pop();
Reducer *reducer = output->get_reducer_class();
reducer->Reduce(input);
delete input;
}
return NULL;
}
[/code]
仕組み的にはこんな感じです。実際に使ってみると1台だとIOがボトルネックになってしまう(個々のスレッドが同じディスクをアクセスしに行く)ので、やはり複数台+GFSのようなIOを分散させる仕組みでやる方が圧倒的に威力を発揮するモデルだと思いますね。
後はこのオモチャMapReduceだとMapが完全に終わりきってからReduceを始めていますが、全Mapperが終了する前にReducerを走らせるだとか、遅いMapperを補助するだとか色々スケジューリングのしどころが有るのでその辺も本家はやってるんでしょうな。
tanakh氏によるとGHCにはControl.Parallel.Strategiesというモジュールが有り、これと似た様な事が出来るそうです。コンパイル時にCPU数を指定するとスレッドが立てられて処理を分散してくれるみたいです。さらにpvmを使っているので複数台にも分散できるらしい。言語にこういう機能が組み込まれているのは良いですな。
まぁ一発ネタでした。メニーコアCPU + クラスタを使いこなす技術をなんとかして編み出したい。というか研究でもその辺をやりたいと思っている今日この頃。(どこの研究室行けばいいのかなー)。

Similar Posts:

Comments:2

blErnellecove 10-03-10 (Wed) 11:38

pureacai.us
buying acai berry online could be risky. If you wishto purchase wise, try pure acai’s products

buy pure acai

Meedayodott 10-03-11 (Thu) 8:10

Plantfoodkingdom.co.uk
sup dude! I’m Cathy do you know where to buy crystal mkat

buy mephedrone uk

Comment Form
Remember personal info

Trackbacks:0

Trackback URL for this entry
http://kzk9.net/blog/2007/01/thread_base_mapreduce.html/trackback
Listed below are links to weblogs that reference
Thread Base MapReduce from moratorium

Home > Uncategorized > Thread Base MapReduce

お薦め本
広告
Archives
Categories

Return to page top