Home > Hadoop > Hadoop + Luceneで分散インデクシング

Hadoop + Luceneで分散インデクシング

  • 2008-08-27 (Wed) 1:07
  • Hadoop
  • hatena button
  • hatena count
  • save this page del.icio.us

Hadoop (0.17系) + Lucene (2.3系) で検索用インデックスを分散インデクシングするコードを公開してみます。HDDに眠らせてるのはちょっともったいない。

いきなりソースコード。


package net.kzk9;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;

public class Indexer extends Configured implements Tool, Mapper<Text, Text, Text, Text>, Reducer<Text, Text, Text, Writable> {
    //
    // Map
    //
    public void map(Text key, Text value,
                    OutputCollector<Text, Text> output,
                    Reporter reporter) throws IOException {
        Text url = key;
        Text str = value;
        output.collect(url, str);
    }

    //
    // Reduce
    //
    private static class LuceneDocumentWrapper implements Writable {
        private Document d;
        public LuceneDocumentWrapper(Document d){
            this.d = d;
        }
        public Document get(){
            return d;
        }
        public void readFields(DataInput in) throws IOException {
            // Empty!
        }
        public void write(DataOutput out) throws IOException {
            // Empty!
        }
    }

    public void reduce(Text key, Iterator<Text> values,
                       OutputCollector<Text, Writable> output,
                       Reporter reporter) throws IOException {
        Text url = key;
        Text str = values.next();

        Document doc = new Document();
        doc.add(new Field("url", url.toString(), Field.Store.YES, Field.Index.NO));
        doc.add(new Field("content", str.toString(), Field.Store.YES, Field.Index.TOKENIZED));
        output.collect(url, new LuceneDocumentWrapper(doc));
    }

    public static class OutputFormat extends org.apache.hadoop.mapred.OutputFormatBase<WritableComparable, LuceneDocumentWrapper> {
        public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job, String name,
                                                                                       final Progressable progress) throws IOException {
            final Path perm = new Path(job.getOutputPath(), name);
            final Path temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));

            final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(), new CJKAnalyzer(), true);

            return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() {
                public void write(WritableComparable key, LuceneDocumentWrapper value) throws IOException {
                    Document doc = value.get();
                    writer.addDocument(doc, new CJKAnalyzer());
                }
                private boolean closed = false;
                public void close(final Reporter reporter) throws IOException {
                    Thread prog = new Thread() {
                            public void run() {
                                while (!closed) {
                                    try {
                                        reporter.setStatus("closing");
                                        Thread.sleep(1000);
                                    } catch (InterruptedException e) { continue; }
                                    catch (Throwable e) { return; }
                                }
                            }
                        };
                    try {
                        prog.start();

                        writer.optimize();
                        writer.close();
                        fs.completeLocalOutput(perm, temp); // copy to dfs
                    } finally {
                        closed = true;
                    }
                }
            };
        }
    }

    //
    // Runner
    //
    static int printUsage() {
        System.out.println("wordcount <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), Indexer.class);
        conf.setJobName("indexer");
        // conf.set("mapred.job.tracker", "local");
        // conf.set("fs.default.name", "local");

        conf.setOutputKeyClass(Text.class);
        //conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Indexer.class);
        conf.setReducerClass(Indexer.class);

        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(OutputFormat.class);

        if (args.length != 2) {
            return printUsage();
        }
        conf.setInputPath(new Path(args[0]));
        conf.setOutputPath(new Path(args[1]));

        JobClient.runJob(conf);
        return 0;
    }

    public void configure(JobConf conf) {
        setConf(conf);
    }
    public void close() {}

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Indexer(), args);
        System.exit(res);
    }
}

Nutchのコードを切り出した形になっています。writer.optimize()をかけるために少しトリッキーなコードになっています。適当に生成した数百Gのテキストをインデクシングしてみましたが、特に問題も無し。

ビルドはこんな感じで(antとか覚えた方が良いのかな)。Hadoopのlibディレクトリにlucene関係のjar入れておきます。


#!/bin/sh
HADOOP_HOME=/home/kzk/hadoop
# add libs to CLASSPATH
for f in $HADOOP_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
echo $CLASSPATH
rm -fR indexer_classes
mkdir indexer_classes
javac -classpath $CLASSPATH -d indexer_classes Indexer.java
jar -cvf indexer.jar -C indexer_classes/ .

分散インデクサーの実行はこんな感じ。入力には、”#{url}\t#{content}\n”がずらずらと並んだファイルを使用します (KeyValueTextInputFormat)。


$HADOOP_HOME/bin/hadoop jar indexer.jar net.kzk9.Indexer input output

これでoutputというHDFS上のディレクトリに、Luceneのインデックスが作成されます。このディレクトリをローカルにコピーして、以下のようなコードで検索します。


package net.kzk9;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;

public class Searcher {
    public static void main(String[] args) {
        try {
            IndexSearcher is = new IndexSearcher("output");
            QueryParser qp = new QueryParser("content", new CJKAnalyzer());
            Query q = qp.parse("Query");
            Hits hs = is.search(q);
            System.out.println("ret = " + hs.length());
            for(int i=0; i<hs.length(); i++){
                Document doc = hs.doc(i);
                float score = hs.score(i);
                System.out.println(score + "\t" + doc.get("url") + "\t" + doc.get("content"));
            }
            is.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Luceneは分散検索用のクラスも有る(RemoteSearcher)っぽいのですが、そこまでやるのは面倒くさかったので無しで(Solrとか見ればいいのは分かってるんだけど、飽きた)。

またn-gramのCJKAnalyzerは遅くて使い物にならず、形態素ベースのJapaneseAnalyzerを使おうとしたのですが、Lucene 2.3系はサポートされてないようで断念。

簡単なサンプルですが誰かのお役に立てればと思います。

Similar Posts:

Comments:0

Comment Form
Remember personal info

Trackbacks:2

Trackback URL for this entry
http://kzk9.net/blog/2008/08/hadoop_lucene.html/trackback
Listed below are links to weblogs that reference
Hadoop + Luceneで分散インデクシング from moratorium
pingback from JJUG CCCでプレゼンします - moratorium 09-03-29 (Sun) 7:31

[...] 「100行で書く分散検索エンジン」という題です。Hadoop+Luceneで分散検索エンジンを作る話です。基本的にはこれの解説と続きです。 ちなみに最初に覚えたプログラミング言語はJavaですがいまいち好きでは有りませんw でもScalaには興味が有るので水島さんのセッションも楽しみ。 ちなみにポロリは無いよ! [...]

pingback from 日曜研究室 [技術的な日常:あなたの幸せはここにある] - 雑記 09-04-21 (Tue) 11:40

[...] 年間通して使ってるインスタンスの台数が多いとかなりいいですね。 Hadoop + Luceneで分散インデクシング - moratorium [...]

Home > Hadoop > Hadoop + Luceneで分散インデクシング

お薦め本
広告
Archives
Categories

Return to page top