HADOOP-1913 Build a Lucene index on an HBase table

Files I failed to add/delete on original commit


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@580399 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-09-28 16:12:24 +00:00
parent 7acd7d074a
commit 1236b87f48
7 changed files with 1230 additions and 1 deletions

View File

@ -0,0 +1,184 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
/**
* Example table column indexing class. Runs a mapreduce job to index
* specified table columns.
* <ul><li>Each row is modeled as a Lucene document: row key is indexed in
* its untokenized form, column name-value pairs are Lucene field name-value
* pairs.</li>
* <li>A file passed on command line is used to populate an
* {@link IndexConfiguration} which is used to set various Lucene parameters,
* specify whether to optimize an index and which columns to index and/or
* store, in tokenized or untokenized form, etc. For an example, see the
* <code>createIndexConfContent</code> method in TestTableIndex
* </li>
* <li>The number of reduce tasks decides the number of indexes (partitions).
* The index(es) is stored in the output path of job configuration.</li>
* <li>The index build process is done in the reduce phase. Users can use
* the map phase to join rows from different tables or to pre-parse/analyze
* column content, etc.</li>
* </ul>
*/
public class BuildTableIndex {
private static final String USAGE = "Usage: BuildTableIndex " +
"-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
"-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
"[<columnName2> ...]";
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
System.exit(-1);
}
public BuildTableIndex() {
super();
}
public void run(String[] args) throws IOException {
if (args.length < 6) {
printUsage("Too few arguments");
}
int numMapTasks = 1;
int numReduceTasks = 1;
String iconfFile = null;
String indexDir = null;
String tableName = null;
StringBuffer columnNames = null;
// parse args
for (int i = 0; i < args.length - 1; i++) {
if ("-m".equals(args[i])) {
numMapTasks = Integer.parseInt(args[++i]);
} else if ("-r".equals(args[i])) {
numReduceTasks = Integer.parseInt(args[++i]);
} else if ("-indexConf".equals(args[i])) {
iconfFile = args[++i];
} else if ("-indexDir".equals(args[i])) {
indexDir = args[++i];
} else if ("-table".equals(args[i])) {
tableName = args[++i];
} else if ("-columns".equals(args[i])) {
columnNames = new StringBuffer(args[++i]);
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
columnNames.append(" ");
columnNames.append(args[++i]);
}
} else {
printUsage("Unsupported option " + args[i]);
}
}
if (indexDir == null || tableName == null || columnNames == null) {
printUsage("Index directory, table name and at least one column must " +
"be specified");
}
Configuration conf = new HBaseConfiguration();
if (iconfFile != null) {
// set index configuration content from a file
String content = readContent(iconfFile);
IndexConfiguration iconf = new IndexConfiguration();
// purely to validate, exception will be thrown if not valid
iconf.addFromXML(content);
conf.set("hbase.index.conf", content);
}
JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
tableName, columnNames.toString());
JobClient.runJob(jobConf);
}
public JobConf createJob(Configuration conf, int numMapTasks,
int numReduceTasks, String indexDir, String tableName,
String columnNames) {
JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
jobConf.setJobName("build index for table " + tableName);
jobConf.setNumMapTasks(numMapTasks);
// number of indexes to partition into
jobConf.setNumReduceTasks(numReduceTasks);
// use identity map (a waste, but just as an example)
IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
jobConf);
// use IndexTableReduce to build a Lucene index
jobConf.setReducerClass(IndexTableReduce.class);
jobConf.setOutputPath(new Path(indexDir));
jobConf.setOutputFormat(IndexOutputFormat.class);
return jobConf;
}
/*
* Read xml file of indexing configurations. The xml format is similar to
* hbase-default.xml and hadoop-default.xml. For an example configuration,
* see the <code>createIndexConfContent</code> method in TestTableIndex
* @param fileName File to read.
* @return XML configuration read from file
* @throws IOException
*/
private String readContent(String fileName) throws IOException {
File file = new File(fileName);
int length = (int) file.length();
if (length == 0) {
printUsage("Index configuration file " + fileName + " does not exist");
}
int bytesRead = 0;
byte[] bytes = new byte[length];
FileInputStream fis = new FileInputStream(file);
try {
// read entire file into content
while (bytesRead < length) {
int read = fis.read(bytes, bytesRead, length - bytesRead);
if (read > 0) {
bytesRead += read;
} else {
break;
}
}
} finally {
fis.close();
}
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
}
public static void main(String[] args) throws IOException {
BuildTableIndex build = new BuildTableIndex();
build.run(args);
}
}

View File

@ -0,0 +1,418 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
/**
* Configuration parameters for building a Lucene index
*/
public class IndexConfiguration extends Configuration {
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
static final String HBASE_COLUMN_NAME = "hbase.column.name";
static final String HBASE_COLUMN_STORE = "hbase.column.store";
static final String HBASE_COLUMN_INDEX = "hbase.column.index";
static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
"hbase.index.max.buffered.docs";
static final String HBASE_INDEX_MAX_BUFFERED_DELS =
"hbase.index.max.buffered.dels";
static final String HBASE_INDEX_MAX_FIELD_LENGTH =
"hbase.index.max.field.length";
static final String HBASE_INDEX_MAX_MERGE_DOCS =
"hbase.index.max.merge.docs";
static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
// double ramBufferSizeMB;
static final String HBASE_INDEX_SIMILARITY_NAME =
"hbase.index.similarity.name";
static final String HBASE_INDEX_USE_COMPOUND_FILE =
"hbase.index.use.compound.file";
static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
public static class ColumnConf extends Properties {
boolean getBoolean(String name, boolean defaultValue) {
String valueString = getProperty(name);
if ("true".equals(valueString))
return true;
else if ("false".equals(valueString))
return false;
else
return defaultValue;
}
void setBoolean(String name, boolean value) {
setProperty(name, Boolean.toString(value));
}
float getFloat(String name, float defaultValue) {
String valueString = getProperty(name);
if (valueString == null)
return defaultValue;
try {
return Float.parseFloat(valueString);
} catch (NumberFormatException e) {
return defaultValue;
}
}
void setFloat(String name, float value) {
setProperty(name, Float.toString(value));
}
}
private HashMap<String, ColumnConf> columnMap = new HashMap<String, ColumnConf>();
public Iterator<String> columnNameIterator() {
return columnMap.keySet().iterator();
}
public boolean isIndex(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
}
public void setIndex(String columnName, boolean index) {
getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
}
public boolean isStore(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
}
public void setStore(String columnName, boolean store) {
getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
}
public boolean isTokenize(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
}
public void setTokenize(String columnName, boolean tokenize) {
getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
}
public float getBoost(String columnName) {
return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
}
public void setBoost(String columnName, float boost) {
getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
}
public boolean isOmitNorms(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
}
public void setOmitNorms(String columnName, boolean omitNorms) {
getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
}
private ColumnConf getColumn(String columnName) {
ColumnConf column = columnMap.get(columnName);
if (column == null) {
column = new ColumnConf();
columnMap.put(columnName, column);
}
return column;
}
public String getAnalyzerName() {
return get(HBASE_INDEX_ANALYZER_NAME,
"org.apache.lucene.analysis.standard.StandardAnalyzer");
}
public void setAnalyzerName(String analyzerName) {
set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
}
public int getMaxBufferedDeleteTerms() {
return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
}
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
}
public int getMaxBufferedDocs() {
return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
}
public void setMaxBufferedDocs(int maxBufferedDocs) {
setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
}
public int getMaxFieldLength() {
return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
}
public void setMaxFieldLength(int maxFieldLength) {
setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
}
public int getMaxMergeDocs() {
return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
}
public void setMaxMergeDocs(int maxMergeDocs) {
setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
}
public int getMergeFactor() {
return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
}
public void setMergeFactor(int mergeFactor) {
setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
}
public String getRowkeyName() {
return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
}
public void setRowkeyName(String rowkeyName) {
set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
}
public String getSimilarityName() {
return get(HBASE_INDEX_SIMILARITY_NAME, null);
}
public void setSimilarityName(String similarityName) {
set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
}
public boolean isUseCompoundFile() {
return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
}
public void setUseCompoundFile(boolean useCompoundFile) {
setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
}
public boolean doOptimize() {
return getBoolean(HBASE_INDEX_OPTIMIZE, true);
}
public void setDoOptimize(boolean doOptimize) {
setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
}
public void addFromXML(String content) {
try {
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
.newDocumentBuilder();
Document doc = builder
.parse(new ByteArrayInputStream(content.getBytes()));
Element root = doc.getDocumentElement();
if (!"configuration".equals(root.getTagName())) {
LOG.fatal("bad conf file: top-level element not <configuration>");
}
NodeList props = root.getChildNodes();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element)) {
continue;
}
Element prop = (Element) propNode;
if ("property".equals(prop.getTagName())) {
propertyFromXML(prop, null);
} else if ("column".equals(prop.getTagName())) {
columnConfFromXML(prop);
} else {
LOG.warn("bad conf content: element neither <property> nor <column>");
}
}
} catch (Exception e) {
LOG.fatal("error parsing conf content: " + e);
throw new RuntimeException(e);
}
}
private void propertyFromXML(Element prop, Properties properties) {
NodeList fields = prop.getChildNodes();
String attr = null;
String value = null;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element)) {
continue;
}
Element field = (Element) fieldNode;
if ("name".equals(field.getTagName())) {
attr = ((Text) field.getFirstChild()).getData();
}
if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
value = ((Text) field.getFirstChild()).getData();
}
}
if (attr != null && value != null) {
if (properties == null) {
set(attr, value);
} else {
properties.setProperty(attr, value);
}
}
}
private void columnConfFromXML(Element column) {
ColumnConf columnConf = new ColumnConf();
NodeList props = column.getChildNodes();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element)) {
continue;
}
Element prop = (Element) propNode;
if ("property".equals(prop.getTagName())) {
propertyFromXML(prop, columnConf);
} else {
LOG.warn("bad conf content: element not <property>");
}
}
if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
} else {
LOG.warn("bad column conf: name not specified");
}
}
public void write(OutputStream out) throws IOException {
try {
Document doc = writeDocument();
DOMSource source = new DOMSource(doc);
StreamResult result = new StreamResult(out);
TransformerFactory transFactory = TransformerFactory.newInstance();
Transformer transformer = transFactory.newTransformer();
transformer.transform(source, result);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Document writeDocument() {
Iterator<Map.Entry<String, String>> iter = iterator();
try {
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
.newDocument();
Element conf = doc.createElement("configuration");
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
Map.Entry<String, String> entry;
while (iter.hasNext()) {
entry = iter.next();
String name = entry.getKey();
String value = entry.getValue();
writeProperty(doc, conf, name, value);
}
Iterator<String> columnIter = columnNameIterator();
while (columnIter.hasNext()) {
writeColumn(doc, conf, columnIter.next());
}
return doc;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void writeProperty(Document doc, Element parent, String name,
String value) {
Element propNode = doc.createElement("property");
parent.appendChild(propNode);
Element nameNode = doc.createElement("name");
nameNode.appendChild(doc.createTextNode(name));
propNode.appendChild(nameNode);
Element valueNode = doc.createElement("value");
valueNode.appendChild(doc.createTextNode(value));
propNode.appendChild(valueNode);
parent.appendChild(doc.createTextNode("\n"));
}
private void writeColumn(Document doc, Element parent, String columnName) {
Element column = doc.createElement("column");
parent.appendChild(column);
column.appendChild(doc.createTextNode("\n"));
ColumnConf columnConf = getColumn(columnName);
for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
if (entry.getKey() instanceof String
&& entry.getValue() instanceof String) {
writeProperty(doc, column, (String) entry.getKey(), (String) entry
.getValue());
}
}
}
public String toString() {
StringWriter writer = new StringWriter();
try {
Document doc = writeDocument();
DOMSource source = new DOMSource(doc);
StreamResult result = new StreamResult(writer);
TransformerFactory transFactory = TransformerFactory.newInstance();
Transformer transformer = transFactory.newTransformer();
transformer.transform(source, result);
} catch (Exception e) {
throw new RuntimeException(e);
}
return writer.toString();
}
}

View File

@ -0,0 +1,163 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormatBase;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Similarity;
/**
* Create a local index, unwrap Lucene documents created by reduce, add them to
* the index, and copy the index to the destination.
*/
public class IndexOutputFormat extends
OutputFormatBase<Text, LuceneDocumentWrapper> {
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
@Override
public RecordWriter<Text, LuceneDocumentWrapper> getRecordWriter(
final FileSystem fs, JobConf job, String name, final Progressable progress)
throws IOException {
final Path perm = new Path(job.getOutputPath(), name);
final Path temp = job.getLocalPath("index/_"
+ Integer.toString(new Random().nextInt()));
LOG.info("To index into " + perm);
// delete old, if any
fs.delete(perm);
final IndexConfiguration indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
String analyzerName = indexConf.getAnalyzerName();
Analyzer analyzer;
try {
Class analyzerClass = Class.forName(analyzerName);
analyzer = (Analyzer) analyzerClass.newInstance();
} catch (Exception e) {
throw new IOException("Error in creating an analyzer object "
+ analyzerName);
}
// build locally first
final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp)
.toString(), analyzer, true);
// no delete, so no need for maxBufferedDeleteTerms
writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
writer.setMaxFieldLength(indexConf.getMaxFieldLength());
writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
writer.setMergeFactor(indexConf.getMergeFactor());
String similarityName = indexConf.getSimilarityName();
if (similarityName != null) {
try {
Class similarityClass = Class.forName(similarityName);
Similarity similarity = (Similarity) similarityClass.newInstance();
writer.setSimilarity(similarity);
} catch (Exception e) {
throw new IOException("Error in creating a similarty object "
+ similarityName);
}
}
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
return new RecordWriter<Text, LuceneDocumentWrapper>() {
private boolean closed;
private long docCount = 0;
public void write(@SuppressWarnings("unused") Text key,
LuceneDocumentWrapper value)
throws IOException {
// unwrap and index doc
Document doc = value.get();
if (LOG.isDebugEnabled()) {
LOG.debug(" Indexing [" + doc + "]");
}
writer.addDocument(doc);
docCount++;
progress.progress();
}
public void close(final Reporter reporter) throws IOException {
// spawn a thread to give progress heartbeats
Thread prog = new Thread() {
public void run() {
while (!closed) {
try {
reporter.setStatus("closing");
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
} catch (Throwable e) {
return;
}
}
}
};
try {
prog.start();
// optimize index
if (indexConf.doOptimize()) {
if (LOG.isInfoEnabled()) {
LOG.info("Optimizing index.");
}
writer.optimize();
}
// close index
writer.close();
if (LOG.isInfoEnabled()) {
LOG.info("Done indexing " + docCount + " docs.");
}
// copy to perm destination in dfs
fs.completeLocalOutput(perm, temp);
if (LOG.isInfoEnabled()) {
LOG.info("Copy done.");
}
} finally {
closed = true;
}
}
};
}
}

View File

@ -0,0 +1,107 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
/**
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
* to build a Lucene index
*/
public class IndexTableReduce extends MapReduceBase implements
Reducer<Text, MapWritable, Text, LuceneDocumentWrapper> {
private static final Logger LOG = Logger.getLogger(IndexTableReduce.class);
private IndexConfiguration indexConf;
public void configure(JobConf job) {
super.configure(job);
indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Index conf: " + indexConf);
}
}
public void close() throws IOException {
super.close();
}
public void reduce(Text key, Iterator<MapWritable> values,
OutputCollector<Text, LuceneDocumentWrapper> output, Reporter reporter)
throws IOException {
if (!values.hasNext()) {
return;
}
Document doc = new Document();
// index and store row key, row key already UTF-8 encoded
Field keyField = new Field(indexConf.getRowkeyName(), key.toString(),
Field.Store.YES, Field.Index.UN_TOKENIZED);
keyField.setOmitNorms(true);
doc.add(keyField);
while (values.hasNext()) {
MapWritable value = values.next();
// each column (name-value pair) is a field (name-value pair)
for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
// name is already UTF-8 encoded
String column = ((Text) entry.getKey()).toString();
byte[] columnValue = ((ImmutableBytesWritable)entry.getValue()).get();
Field.Store store = indexConf.isStore(column)?
Field.Store.YES: Field.Store.NO;
Field.Index index = indexConf.isIndex(column)?
(indexConf.isTokenize(column)?
Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
Field.Index.NO;
// UTF-8 encode value
Field field = new Field(column, new String(columnValue,
HConstants.UTF8_ENCODING), store, index);
field.setBoost(indexConf.getBoost(column));
field.setOmitNorms(indexConf.isOmitNorms(column));
doc.add(field);
}
}
output.collect(key, new LuceneDocumentWrapper(doc));
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.lucene.document.Document;
/**
* A utility class used to pass a lucene document from reduce to OutputFormat.
* It doesn't really serialize/deserialize a lucene document.
*/
class LuceneDocumentWrapper implements Writable {
private Document doc;
public LuceneDocumentWrapper(Document doc) {
this.doc = doc;
}
public Document get() {
return doc;
}
public void readFields(DataInput in) throws IOException {
// intentionally left blank
}
public void write(DataOutput out) throws IOException {
// intentionally left blank
}
}

View File

@ -0,0 +1,297 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import junit.framework.TestSuite;
import junit.textui.TestRunner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiSearcher;
import org.apache.lucene.search.Searchable;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.TermQuery;
/**
* Test Map/Reduce job to build index over HBase table
*/
public class TestTableIndex extends HBaseTestCase {
private static final Log LOG = LogFactory.getLog(TestTableIndex.class);
static final String TABLE_NAME = "moretest";
static final String INPUT_COLUMN = "contents:";
static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
static final String OUTPUT_COLUMN = "text:";
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
static final String ROWKEY_NAME = "key";
static final String INDEX_DIR = "testindex";
private HTableDescriptor desc;
private MiniDFSCluster dfsCluster = null;
private FileSystem fs;
private Path dir;
private MiniHBaseCluster hCluster = null;
@Override
public void setUp() throws Exception {
super.setUp();
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
dfsCluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
try {
fs = dfsCluster.getFileSystem();
dir = new Path("/hbase");
fs.mkdirs(dir);
// Start up HBase cluster
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
// Create a table.
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
// Populate a table into multiple regions
MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME,
INPUT_COLUMN);
// Verify table indeed has multiple regions
HTable table = new HTable(conf, new Text(TABLE_NAME));
Text[] startKeys = table.getStartKeys();
assertTrue(startKeys.length > 1);
} catch (Exception e) {
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
throw e;
}
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (hCluster != null) {
hCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
/**
* Test HBase map/reduce
*
* @throws IOException
*/
@SuppressWarnings("static-access")
public void testTableIndex() throws IOException {
long firstK = 32;
LOG.info("Print table contents before map/reduce");
scanTable(conf, firstK);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
// set configuration parameter for index build
conf.set("hbase.index.conf", createIndexConfContent());
try {
JobConf jobConf = new JobConf(conf, TestTableIndex.class);
jobConf.setJobName("index column contents");
jobConf.setNumMapTasks(2);
// number of indexes to partition into
jobConf.setNumReduceTasks(1);
// use identity map (a waste, but just as an example)
IdentityTableMap.initJob(TABLE_NAME, INPUT_COLUMN,
IdentityTableMap.class, jobConf);
// use IndexTableReduce to build a Lucene index
jobConf.setReducerClass(IndexTableReduce.class);
jobConf.setOutputPath(new Path(INDEX_DIR));
jobConf.setOutputFormat(IndexOutputFormat.class);
JobClient.runJob(jobConf);
} finally {
mrCluster.shutdown();
}
LOG.info("Print table contents after map/reduce");
scanTable(conf, firstK);
// verify index results
verify(conf);
}
private String createIndexConfContent() {
StringBuffer buffer = new StringBuffer();
buffer.append("<configuration><column><property>" +
"<name>hbase.column.name</name><value>" + INPUT_COLUMN +
"</value></property>");
buffer.append("<property><name>hbase.column.store</name> " +
"<value>true</value></property>");
buffer.append("<property><name>hbase.column.index</name>" +
"<value>true</value></property>");
buffer.append("<property><name>hbase.column.tokenize</name>" +
"<value>false</value></property>");
buffer.append("<property><name>hbase.column.boost</name>" +
"<value>3</value></property>");
buffer.append("<property><name>hbase.column.omit.norms</name>" +
"<value>false</value></property></column>");
buffer.append("<property><name>hbase.index.rowkey.name</name><value>" +
ROWKEY_NAME + "</value></property>");
buffer.append("<property><name>hbase.index.max.buffered.docs</name>" +
"<value>500</value></property>");
buffer.append("<property><name>hbase.index.max.field.length</name>" +
"<value>10000</value></property>");
buffer.append("<property><name>hbase.index.merge.factor</name>" +
"<value>10</value></property>");
buffer.append("<property><name>hbase.index.use.compound.file</name>" +
"<value>true</value></property>");
buffer.append("<property><name>hbase.index.optimize</name>" +
"<value>true</value></property></configuration>");
IndexConfiguration c = new IndexConfiguration();
c.addFromXML(buffer.toString());
return c.toString();
}
private void scanTable(Configuration c, long firstK) throws IOException {
HTable table = new HTable(c, new Text(TABLE_NAME));
Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
HScannerInterface scanner = table.obtainScanner(columns,
HConstants.EMPTY_START_ROW);
long count = 0;
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
while (scanner.next(key, results)) {
if (count < firstK)
LOG.info("row: " + key.getRow());
for (Map.Entry<Text, byte[]> e : results.entrySet()) {
if (count < firstK)
LOG.info(" column: " + e.getKey() + " value: "
+ new String(e.getValue(), HConstants.UTF8_ENCODING));
}
count++;
}
} finally {
scanner.close();
}
}
private void verify(Configuration c) throws IOException {
Path localDir = new Path(this.testDir, "index_" +
Integer.toString(new Random().nextInt()));
this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
Path [] indexDirs = this.localFs.listPaths(new Path [] {localDir});
Searcher searcher = null;
HScannerInterface scanner = null;
try {
if (indexDirs.length == 1) {
searcher = new IndexSearcher((new File(indexDirs[0].
toUri())).getAbsolutePath());
} else if (indexDirs.length > 1) {
Searchable[] searchers = new Searchable[indexDirs.length];
for (int i = 0; i < indexDirs.length; i++) {
searchers[i] = new IndexSearcher((new File(indexDirs[i].
toUri()).getAbsolutePath()));
}
searcher = new MultiSearcher(searchers);
} else {
throw new IOException("no index directory found");
}
HTable table = new HTable(c, new Text(TABLE_NAME));
Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
IndexConfiguration indexConf = new IndexConfiguration();
String content = c.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
String rowkeyName = indexConf.getRowkeyName();
int count = 0;
while (scanner.next(key, results)) {
String value = key.getRow().toString();
Term term = new Term(rowkeyName, value);
int hitCount = searcher.search(new TermQuery(term)).length();
assertEquals("check row " + value, 1, hitCount);
count++;
}
int maxDoc = searcher.maxDoc();
assertEquals("check number of rows", count, maxDoc);
} finally {
if (null != searcher)
searcher.close();
if (null != scanner)
scanner.close();
}
}
/**
* @param args unused
*/
public static void main(@SuppressWarnings("unused") String[] args) {
TestRunner.run(new TestSuite(TestTableIndex.class));
}
}

View File

@ -17,7 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@ -36,6 +36,15 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableOutputCollector;