2008年08月27日
Hadoop + Luceneで分散インデクシング
Hadoop (0.17系) + Lucene (2.3系) で検索用インデックスを分散インデクシングするコードを公開してみます。HDDに眠らせてるのはちょっともったいない。
いきなりソースコード。
package net.kzk9;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;
public class Indexer extends Configured implements Tool, Mapper<Text, Text, Text, Text>, Reducer<Text, Text, Text, Writable> {
//
// Map
//
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
Text url = key;
Text str = value;
output.collect(url, str);
}
//
// Reduce
//
private static class LuceneDocumentWrapper implements Writable {
private Document d;
public LuceneDocumentWrapper(Document d){
this.d = d;
}
public Document get(){
return d;
}
public void readFields(DataInput in) throws IOException {
// Empty!
}
public void write(DataOutput out) throws IOException {
// Empty!
}
}
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Writable> output,
Reporter reporter) throws IOException {
Text url = key;
Text str = values.next();
Document doc = new Document();
doc.add(new Field("url", url.toString(), Field.Store.YES, Field.Index.NO));
doc.add(new Field("content", str.toString(), Field.Store.YES, Field.Index.TOKENIZED));
output.collect(url, new LuceneDocumentWrapper(doc));
}
public static class OutputFormat extends org.apache.hadoop.mapred.OutputFormatBase<WritableComparable, LuceneDocumentWrapper> {
public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job, String name,
final Progressable progress) throws IOException {
final Path perm = new Path(job.getOutputPath(), name);
final Path temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));
final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(), new CJKAnalyzer(), true);
return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() {
public void write(WritableComparable key, LuceneDocumentWrapper value) throws IOException {
Document doc = value.get();
writer.addDocument(doc, new CJKAnalyzer());
}
private boolean closed = false;
public void close(final Reporter reporter) throws IOException {
Thread prog = new Thread() {
public void run() {
while (!closed) {
try {
reporter.setStatus("closing");
Thread.sleep(1000);
} catch (InterruptedException e) { continue; }
catch (Throwable e) { return; }
}
}
};
try {
prog.start();
writer.optimize();
writer.close();
fs.completeLocalOutput(perm, temp); // copy to dfs
} finally {
closed = true;
}
}
};
}
}
//
// Runner
//
static int printUsage() {
System.out.println("wordcount <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), Indexer.class);
conf.setJobName("indexer");
// conf.set("mapred.job.tracker", "local");
// conf.set("fs.default.name", "local");
conf.setOutputKeyClass(Text.class);
//conf.setOutputValueClass(Text.class);
conf.setMapperClass(Indexer.class);
conf.setReducerClass(Indexer.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(OutputFormat.class);
if (args.length != 2) {
return printUsage();
}
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public void configure(JobConf conf) {
setConf(conf);
}
public void close() {}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Indexer(), args);
System.exit(res);
}
}
Nutchのコードを切り出した形になっています。writer.optimize()をかけるために少しトリッキーなコードになっています。適当に生成した数百Gのテキストをインデクシングしてみましたが、特に問題も無し。
ビルドはこんな感じで(antとか覚えた方が良いのかな)。Hadoopのlibディレクトリにlucene関係のjar入れておきます。
#!/bin/sh
HADOOP_HOME=/home/kzk/hadoop
# add libs to CLASSPATH
for f in $HADOOP_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
echo $CLASSPATH
rm -fR indexer_classes
mkdir indexer_classes
javac -classpath $CLASSPATH -d indexer_classes Indexer.java
jar -cvf indexer.jar -C indexer_classes/ .
分散インデクサーの実行はこんな感じ。入力には、"#{url}\t#{content}\n"がずらずらと並んだファイルを使用します (KeyValueTextInputFormat)。
$HADOOP_HOME/bin/hadoop jar indexer.jar net.kzk9.Indexer input output
これでoutputというHDFS上のディレクトリに、Luceneのインデックスが作成されます。このディレクトリをローカルにコピーして、以下のようなコードで検索します。
package net.kzk9;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.analysis.cjk.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;
public class Searcher {
public static void main(String[] args) {
try {
IndexSearcher is = new IndexSearcher("output");
QueryParser qp = new QueryParser("content", new CJKAnalyzer());
Query q = qp.parse("Query");
Hits hs = is.search(q);
System.out.println("ret = " + hs.length());
for(int i=0; i<hs.length(); i++){
Document doc = hs.doc(i);
float score = hs.score(i);
System.out.println(score + "\t" + doc.get("url") + "\t" + doc.get("content"));
}
is.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Luceneは分散検索用のクラスも有る(RemoteSearcher)っぽいのですが、そこまでやるのは面倒くさかったので無しで(Solrとか見ればいいのは分かってるんだけど、飽きた)。
またn-gramのCJKAnalyzerは遅くて使い物にならず、形態素ベースのJapaneseAnalyzerを使おうとしたのですが、Lucene 2.3系はサポートされてないようで断念。
簡単なサンプルですが誰かのお役に立てればと思います。
- by
- at 01:07

comments