自己紹介

太田一樹。
東京の大学の情報科学科に通う大学生。moratorium満喫中。

お勧め書籍 [全部見る]

飾り

Search


Category Archives

Recent Entries

  1. 論文
  2. JJUG CCCでプレゼンします
  3. kzk's bookshelf
  4. En Google by Gulfweed
  5. PNUTS
  6. コメントスパム対策
  7. Hadoop + Luceneで分散インデクシング
  8. Hadoopの解析資料
  9. Cluster 2008
  10. SWoPP 2008

2008年08月27日

Hadoop + Luceneで分散インデクシング

Hadoop (0.17系) + Lucene (2.3系) で検索用インデックスを分散インデクシングするコードを公開してみます。HDDに眠らせてるのはちょっともったいない。

いきなりソースコード。

package net.kzk9;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;

public class Indexer extends Configured implements Tool, Mapper<Text, Text, Text, Text>, Reducer<Text, Text, Text, Writable> {
    //
    // Map
    //
    public void map(Text key, Text value,
                    OutputCollector<Text, Text> output,
                    Reporter reporter) throws IOException {
        Text url = key;
        Text str = value;
        output.collect(url, str);
    }

    //
    // Reduce
    //
    private static class LuceneDocumentWrapper implements Writable {
        private Document d;
        public LuceneDocumentWrapper(Document d){
            this.d = d;
        }
        public Document get(){
            return d;
        }
        public void readFields(DataInput in) throws IOException {
            // Empty!
        }
        public void write(DataOutput out) throws IOException {
            // Empty!
        }
    }

    public void reduce(Text key, Iterator<Text> values,
                       OutputCollector<Text, Writable> output,
                       Reporter reporter) throws IOException {
        Text url = key;
        Text str = values.next();
        
        Document doc = new Document();
        doc.add(new Field("url", url.toString(), Field.Store.YES, Field.Index.NO));
        doc.add(new Field("content", str.toString(), Field.Store.YES, Field.Index.TOKENIZED));
        output.collect(url, new LuceneDocumentWrapper(doc));
    }

    public static class OutputFormat extends org.apache.hadoop.mapred.OutputFormatBase<WritableComparable, LuceneDocumentWrapper> {
        public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job, String name,
                                                                                       final Progressable progress) throws IOException {
            final Path perm = new Path(job.getOutputPath(), name);
            final Path temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));

            final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(), new CJKAnalyzer(), true);
            
            return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() {
                public void write(WritableComparable key, LuceneDocumentWrapper value) throws IOException {
                    Document doc = value.get();
                    writer.addDocument(doc, new CJKAnalyzer());
                }
                private boolean closed = false;
                public void close(final Reporter reporter) throws IOException {
                    Thread prog = new Thread() {
                            public void run() {
                                while (!closed) {
                                    try {
                                        reporter.setStatus("closing");
                                        Thread.sleep(1000);
                                    } catch (InterruptedException e) { continue; }
                                    catch (Throwable e) { return; }
                                }
                            }
                        };
                    try {
                        prog.start();

                        writer.optimize();
                        writer.close();
                        fs.completeLocalOutput(perm, temp); // copy to dfs
                    } finally {
                        closed = true;
                    }                        
                }
            };
        }
    }

    //
    // Runner
    //
    static int printUsage() {
        System.out.println("wordcount <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), Indexer.class);
        conf.setJobName("indexer");
        // conf.set("mapred.job.tracker", "local");
        // conf.set("fs.default.name", "local");

        conf.setOutputKeyClass(Text.class);
        //conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Indexer.class);
        conf.setReducerClass(Indexer.class);

        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(OutputFormat.class);

        if (args.length != 2) {
            return printUsage();
        }
        conf.setInputPath(new Path(args[0]));
        conf.setOutputPath(new Path(args[1]));

        JobClient.runJob(conf);
        return 0;
    }

    public void configure(JobConf conf) {
        setConf(conf);
    }
    public void close() {}

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Indexer(), args);
        System.exit(res);
    }
}

Nutchのコードを切り出した形になっています。writer.optimize()をかけるために少しトリッキーなコードになっています。適当に生成した数百Gのテキストをインデクシングしてみましたが、特に問題も無し。

ビルドはこんな感じで(antとか覚えた方が良いのかな)。Hadoopのlibディレクトリにlucene関係のjar入れておきます。

#!/bin/sh
HADOOP_HOME=/home/kzk/hadoop

# add libs to CLASSPATH
for f in $HADOOP_HOME/*.jar; do
  CLASSPATH=${CLASSPATH}:$f;
done

for f in $HADOOP_HOME/lib/*.jar; do
  CLASSPATH=${CLASSPATH}:$f;
done

for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
  CLASSPATH=${CLASSPATH}:$f;
done

echo $CLASSPATH
rm -fR indexer_classes
mkdir indexer_classes
javac -classpath $CLASSPATH -d indexer_classes Indexer.java
jar -cvf indexer.jar -C indexer_classes/ .

分散インデクサーの実行はこんな感じ。入力には、"#{url}\t#{content}\n"がずらずらと並んだファイルを使用します (KeyValueTextInputFormat)。

$HADOOP_HOME/bin/hadoop jar indexer.jar net.kzk9.Indexer input output

これでoutputというHDFS上のディレクトリに、Luceneのインデックスが作成されます。このディレクトリをローカルにコピーして、以下のようなコードで検索します。

package net.kzk9;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;

public class Searcher {
    public static void main(String[] args) {
        try {
            IndexSearcher is = new IndexSearcher("output");
            QueryParser qp = new QueryParser("content", new CJKAnalyzer());
            Query q = qp.parse("Query");
            Hits hs = is.search(q);
            System.out.println("ret = " + hs.length());
            for(int i=0; i<hs.length(); i++){
                Document doc = hs.doc(i);
                float score = hs.score(i);
                System.out.println(score + "\t" + doc.get("url") + "\t" + doc.get("content"));
            }
            is.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Luceneは分散検索用のクラスも有る(RemoteSearcher)っぽいのですが、そこまでやるのは面倒くさかったので無しで(Solrとか見ればいいのは分かってるんだけど、飽きた)。

またn-gramのCJKAnalyzerは遅くて使い物にならず、形態素ベースのJapaneseAnalyzerを使おうとしたのですが、Lucene 2.3系はサポートされてないようで断念。

簡単なサンプルですが誰かのお役に立てればと思います。


trackbacks

trackbackURL:

comments

comment form
comment form