HBASE-2212 Refactor out lucene dependencies from HBase
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@909585 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69bd0cb802
commit
f5e8fc90cc
|
@ -8,6 +8,8 @@ Release 0.21.0 - Unreleased
|
|||
existing KeyValues, not copied in copy constructor)
|
||||
(Dave Latham via Stack)
|
||||
HBASE-1360 move up to Thrift 0.2.0 (Kay Kay and Lars Francke via Stack)
|
||||
HBASE-2212 Refactor out lucene dependencies from HBase
|
||||
(Kay Kay via Stack)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew
|
||||
|
|
5
ivy.xml
5
ivy.xml
|
@ -53,7 +53,7 @@
|
|||
<!--Private configurations. -->
|
||||
|
||||
<conf name="javadoc" visibility="private" description="artiracts required while performing doc generation"
|
||||
extends="common,mandatory,lucene"/>
|
||||
extends="common,mandatory"/>
|
||||
|
||||
<conf name="releaseaudit" visibility="private"
|
||||
description="Artifacts required for releaseaudit target"/>
|
||||
|
@ -61,7 +61,6 @@
|
|||
<conf name="commons-logging" visibility="private"/>
|
||||
<conf name="httpclient" visibility="private" extends="commons-logging"/>
|
||||
<conf name="log4j" visibility="private"/>
|
||||
<conf name="lucene" visibility="private"/>
|
||||
<conf name="jdiff" visibility="private" extends="log4j,server"/>
|
||||
<conf name="checkstyle" visibility="private"/>
|
||||
|
||||
|
@ -85,8 +84,6 @@
|
|||
rev="${hadoop-mapred.version}" conf="common->default" changing="true" >
|
||||
<exclude conf="test"/>
|
||||
</dependency>
|
||||
<dependency org="org.apache.lucene" name="lucene-core"
|
||||
rev="${lucene.version}" conf="common->default" />
|
||||
<dependency org="log4j" name="log4j"
|
||||
rev="${log4j.version}" conf="common->master">
|
||||
<exclude conf="jmx,mail,jms"/>
|
||||
|
|
|
@ -26,7 +26,6 @@ hadoop-mapred.version=0.21.0-SNAPSHOT
|
|||
zookeeper.version=3.2.2
|
||||
thrift.version=r771587
|
||||
|
||||
lucene.version=3.0.0
|
||||
|
||||
jsr311.version=1.1.1
|
||||
|
||||
|
|
|
@ -1,206 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
||||
/**
|
||||
* Example table column indexing class. Runs a mapreduce job to index
|
||||
* specified table columns.
|
||||
* <ul><li>Each row is modeled as a Lucene document: row key is indexed in
|
||||
* its untokenized form, column name-value pairs are Lucene field name-value
|
||||
* pairs.</li>
|
||||
* <li>A file passed on command line is used to populate an
|
||||
* {@link IndexConfiguration} which is used to set various Lucene parameters,
|
||||
* specify whether to optimize an index and which columns to index and/or
|
||||
* store, in tokenized or untokenized form, etc. For an example, see the
|
||||
* <code>createIndexConfContent</code> method in TestTableIndex
|
||||
* </li>
|
||||
* <li>The number of reduce tasks decides the number of indexes (partitions).
|
||||
* The index(es) is stored in the output path of job configuration.</li>
|
||||
* <li>The index build process is done in the reduce phase. Users can use
|
||||
* the map phase to join rows from different tables or to pre-parse/analyze
|
||||
* column content, etc.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Deprecated
|
||||
public class BuildTableIndex {
|
||||
private static final String USAGE = "Usage: BuildTableIndex " +
|
||||
"-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
|
||||
"-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
|
||||
"[<columnName2> ...]";
|
||||
|
||||
private static void printUsage(String message) {
|
||||
System.err.println(message);
|
||||
System.err.println(USAGE);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
/** default constructor */
|
||||
public BuildTableIndex() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public void run(String[] args) throws IOException {
|
||||
if (args.length < 6) {
|
||||
printUsage("Too few arguments");
|
||||
}
|
||||
|
||||
int numMapTasks = 1;
|
||||
int numReduceTasks = 1;
|
||||
String iconfFile = null;
|
||||
String indexDir = null;
|
||||
String tableName = null;
|
||||
StringBuffer columnNames = null;
|
||||
|
||||
// parse args
|
||||
for (int i = 0; i < args.length - 1; i++) {
|
||||
if ("-m".equals(args[i])) {
|
||||
numMapTasks = Integer.parseInt(args[++i]);
|
||||
} else if ("-r".equals(args[i])) {
|
||||
numReduceTasks = Integer.parseInt(args[++i]);
|
||||
} else if ("-indexConf".equals(args[i])) {
|
||||
iconfFile = args[++i];
|
||||
} else if ("-indexDir".equals(args[i])) {
|
||||
indexDir = args[++i];
|
||||
} else if ("-table".equals(args[i])) {
|
||||
tableName = args[++i];
|
||||
} else if ("-columns".equals(args[i])) {
|
||||
columnNames = new StringBuffer(args[++i]);
|
||||
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
|
||||
columnNames.append(" ");
|
||||
columnNames.append(args[++i]);
|
||||
}
|
||||
} else {
|
||||
printUsage("Unsupported option " + args[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (indexDir == null || tableName == null || columnNames == null) {
|
||||
printUsage("Index directory, table name and at least one column must " +
|
||||
"be specified");
|
||||
}
|
||||
|
||||
Configuration conf = new HBaseConfiguration();
|
||||
if (iconfFile != null) {
|
||||
// set index configuration content from a file
|
||||
String content = readContent(iconfFile);
|
||||
IndexConfiguration iconf = new IndexConfiguration();
|
||||
// purely to validate, exception will be thrown if not valid
|
||||
iconf.addFromXML(content);
|
||||
conf.set("hbase.index.conf", content);
|
||||
}
|
||||
|
||||
if (columnNames != null) {
|
||||
JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
|
||||
tableName, columnNames.toString());
|
||||
JobClient.runJob(jobConf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param numMapTasks
|
||||
* @param numReduceTasks
|
||||
* @param indexDir
|
||||
* @param tableName
|
||||
* @param columnNames
|
||||
* @return JobConf
|
||||
*/
|
||||
public JobConf createJob(Configuration conf, int numMapTasks,
|
||||
int numReduceTasks, String indexDir, String tableName,
|
||||
String columnNames) {
|
||||
JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
|
||||
jobConf.setJobName("build index for table " + tableName);
|
||||
jobConf.setNumMapTasks(numMapTasks);
|
||||
// number of indexes to partition into
|
||||
jobConf.setNumReduceTasks(numReduceTasks);
|
||||
|
||||
// use identity map (a waste, but just as an example)
|
||||
IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
|
||||
jobConf);
|
||||
|
||||
// use IndexTableReduce to build a Lucene index
|
||||
jobConf.setReducerClass(IndexTableReduce.class);
|
||||
FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
|
||||
jobConf.setOutputFormat(IndexOutputFormat.class);
|
||||
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read xml file of indexing configurations. The xml format is similar to
|
||||
* hbase-default.xml and hadoop-default.xml. For an example configuration,
|
||||
* see the <code>createIndexConfContent</code> method in TestTableIndex
|
||||
* @param fileName File to read.
|
||||
* @return XML configuration read from file
|
||||
* @throws IOException
|
||||
*/
|
||||
private String readContent(String fileName) throws IOException {
|
||||
File file = new File(fileName);
|
||||
int length = (int) file.length();
|
||||
if (length == 0) {
|
||||
printUsage("Index configuration file " + fileName + " does not exist");
|
||||
}
|
||||
|
||||
int bytesRead = 0;
|
||||
byte[] bytes = new byte[length];
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
|
||||
try {
|
||||
// read entire file into content
|
||||
while (bytesRead < length) {
|
||||
int read = fis.read(bytes, bytesRead, length - bytesRead);
|
||||
if (read > 0) {
|
||||
bytesRead += read;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
|
||||
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
BuildTableIndex build = new BuildTableIndex();
|
||||
build.run(args);
|
||||
}
|
||||
}
|
|
@ -1,423 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.w3c.dom.Text;
|
||||
|
||||
/**
|
||||
* Configuration parameters for building a Lucene index
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexConfiguration extends Configuration {
|
||||
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
|
||||
|
||||
static final String HBASE_COLUMN_NAME = "hbase.column.name";
|
||||
static final String HBASE_COLUMN_STORE = "hbase.column.store";
|
||||
static final String HBASE_COLUMN_INDEX = "hbase.column.index";
|
||||
static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
|
||||
static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
|
||||
static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
|
||||
static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
|
||||
static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
|
||||
"hbase.index.max.buffered.docs";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DELS =
|
||||
"hbase.index.max.buffered.dels";
|
||||
static final String HBASE_INDEX_MAX_FIELD_LENGTH =
|
||||
"hbase.index.max.field.length";
|
||||
static final String HBASE_INDEX_MAX_MERGE_DOCS =
|
||||
"hbase.index.max.merge.docs";
|
||||
static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
|
||||
// double ramBufferSizeMB;
|
||||
static final String HBASE_INDEX_SIMILARITY_NAME =
|
||||
"hbase.index.similarity.name";
|
||||
static final String HBASE_INDEX_USE_COMPOUND_FILE =
|
||||
"hbase.index.use.compound.file";
|
||||
static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
|
||||
|
||||
public static class ColumnConf extends Properties {
|
||||
|
||||
private static final long serialVersionUID = 7419012290580607821L;
|
||||
|
||||
boolean getBoolean(String name, boolean defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if ("true".equals(valueString))
|
||||
return true;
|
||||
else if ("false".equals(valueString))
|
||||
return false;
|
||||
else
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
void setBoolean(String name, boolean value) {
|
||||
setProperty(name, Boolean.toString(value));
|
||||
}
|
||||
|
||||
float getFloat(String name, float defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if (valueString == null)
|
||||
return defaultValue;
|
||||
try {
|
||||
return Float.parseFloat(valueString);
|
||||
} catch (NumberFormatException e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
void setFloat(String name, float value) {
|
||||
setProperty(name, Float.toString(value));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, ColumnConf> columnMap =
|
||||
new ConcurrentHashMap<String, ColumnConf>();
|
||||
|
||||
public Iterator<String> columnNameIterator() {
|
||||
return columnMap.keySet().iterator();
|
||||
}
|
||||
|
||||
public boolean isIndex(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
|
||||
}
|
||||
|
||||
public void setIndex(String columnName, boolean index) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
|
||||
}
|
||||
|
||||
public boolean isStore(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
|
||||
}
|
||||
|
||||
public void setStore(String columnName, boolean store) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
|
||||
}
|
||||
|
||||
public boolean isTokenize(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
|
||||
}
|
||||
|
||||
public void setTokenize(String columnName, boolean tokenize) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
|
||||
}
|
||||
|
||||
public float getBoost(String columnName) {
|
||||
return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
|
||||
}
|
||||
|
||||
public void setBoost(String columnName, float boost) {
|
||||
getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
|
||||
}
|
||||
|
||||
public boolean isOmitNorms(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
|
||||
}
|
||||
|
||||
public void setOmitNorms(String columnName, boolean omitNorms) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
|
||||
}
|
||||
|
||||
private ColumnConf getColumn(String columnName) {
|
||||
ColumnConf column = columnMap.get(columnName);
|
||||
if (column == null) {
|
||||
column = new ColumnConf();
|
||||
columnMap.put(columnName, column);
|
||||
}
|
||||
return column;
|
||||
}
|
||||
|
||||
public String getAnalyzerName() {
|
||||
return get(HBASE_INDEX_ANALYZER_NAME,
|
||||
"org.apache.lucene.analysis.standard.StandardAnalyzer");
|
||||
}
|
||||
|
||||
public void setAnalyzerName(String analyzerName) {
|
||||
set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDeleteTerms() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDocs(int maxBufferedDocs) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
|
||||
}
|
||||
|
||||
public int getMaxFieldLength() {
|
||||
return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxFieldLength(int maxFieldLength) {
|
||||
setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
|
||||
}
|
||||
|
||||
public int getMaxMergeDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxMergeDocs(int maxMergeDocs) {
|
||||
setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
|
||||
}
|
||||
|
||||
public int getMergeFactor() {
|
||||
return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
|
||||
}
|
||||
|
||||
public void setMergeFactor(int mergeFactor) {
|
||||
setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
|
||||
}
|
||||
|
||||
public String getRowkeyName() {
|
||||
return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
|
||||
}
|
||||
|
||||
public void setRowkeyName(String rowkeyName) {
|
||||
set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
|
||||
}
|
||||
|
||||
public String getSimilarityName() {
|
||||
return get(HBASE_INDEX_SIMILARITY_NAME, null);
|
||||
}
|
||||
|
||||
public void setSimilarityName(String similarityName) {
|
||||
set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
|
||||
}
|
||||
|
||||
public boolean isUseCompoundFile() {
|
||||
return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
|
||||
}
|
||||
|
||||
public void setUseCompoundFile(boolean useCompoundFile) {
|
||||
setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
|
||||
}
|
||||
|
||||
public boolean doOptimize() {
|
||||
return getBoolean(HBASE_INDEX_OPTIMIZE, true);
|
||||
}
|
||||
|
||||
public void setDoOptimize(boolean doOptimize) {
|
||||
setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
|
||||
}
|
||||
|
||||
public void addFromXML(String content) {
|
||||
try {
|
||||
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
|
||||
.newDocumentBuilder();
|
||||
|
||||
Document doc = builder
|
||||
.parse(new ByteArrayInputStream(content.getBytes()));
|
||||
|
||||
Element root = doc.getDocumentElement();
|
||||
if (!"configuration".equals(root.getTagName())) {
|
||||
LOG.fatal("bad conf file: top-level element not <configuration>");
|
||||
}
|
||||
|
||||
NodeList props = root.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, null);
|
||||
} else if ("column".equals(prop.getTagName())) {
|
||||
columnConfFromXML(prop);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element neither <property> nor <column>");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("error parsing conf content: " + e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void propertyFromXML(Element prop, Properties properties) {
|
||||
NodeList fields = prop.getChildNodes();
|
||||
String attr = null;
|
||||
String value = null;
|
||||
|
||||
for (int j = 0; j < fields.getLength(); j++) {
|
||||
Node fieldNode = fields.item(j);
|
||||
if (!(fieldNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element field = (Element) fieldNode;
|
||||
if ("name".equals(field.getTagName())) {
|
||||
attr = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
|
||||
value = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
}
|
||||
|
||||
if (attr != null && value != null) {
|
||||
if (properties == null) {
|
||||
set(attr, value);
|
||||
} else {
|
||||
properties.setProperty(attr, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void columnConfFromXML(Element column) {
|
||||
ColumnConf columnConf = new ColumnConf();
|
||||
NodeList props = column.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, columnConf);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element not <property>");
|
||||
}
|
||||
}
|
||||
|
||||
if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
|
||||
columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
|
||||
} else {
|
||||
LOG.warn("bad column conf: name not specified");
|
||||
}
|
||||
}
|
||||
|
||||
public void write(OutputStream out) {
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(out);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Document writeDocument() {
|
||||
Iterator<Map.Entry<String, String>> iter = iterator();
|
||||
try {
|
||||
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
|
||||
.newDocument();
|
||||
Element conf = doc.createElement("configuration");
|
||||
doc.appendChild(conf);
|
||||
conf.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
Map.Entry<String, String> entry;
|
||||
while (iter.hasNext()) {
|
||||
entry = iter.next();
|
||||
String name = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
writeProperty(doc, conf, name, value);
|
||||
}
|
||||
|
||||
Iterator<String> columnIter = columnNameIterator();
|
||||
while (columnIter.hasNext()) {
|
||||
writeColumn(doc, conf, columnIter.next());
|
||||
}
|
||||
|
||||
return doc;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeProperty(Document doc, Element parent, String name,
|
||||
String value) {
|
||||
Element propNode = doc.createElement("property");
|
||||
parent.appendChild(propNode);
|
||||
|
||||
Element nameNode = doc.createElement("name");
|
||||
nameNode.appendChild(doc.createTextNode(name));
|
||||
propNode.appendChild(nameNode);
|
||||
|
||||
Element valueNode = doc.createElement("value");
|
||||
valueNode.appendChild(doc.createTextNode(value));
|
||||
propNode.appendChild(valueNode);
|
||||
|
||||
parent.appendChild(doc.createTextNode("\n"));
|
||||
}
|
||||
|
||||
private void writeColumn(Document doc, Element parent, String columnName) {
|
||||
Element column = doc.createElement("column");
|
||||
parent.appendChild(column);
|
||||
column.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
ColumnConf columnConf = getColumn(columnName);
|
||||
for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
|
||||
if (entry.getKey() instanceof String
|
||||
&& entry.getValue() instanceof String) {
|
||||
writeProperty(doc, column, (String) entry.getKey(), (String) entry
|
||||
.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringWriter writer = new StringWriter();
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(writer);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
}
|
|
@ -1,166 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.RecordWriter;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.Similarity;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
|
||||
/**
|
||||
* Create a local index, unwrap Lucene documents created by reduce, add them to
|
||||
* the index, and copy the index to the destination.
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexOutputFormat extends
|
||||
FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
|
||||
|
||||
private Random random = new Random();
|
||||
|
||||
@Override
|
||||
public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
|
||||
getRecordWriter(final FileSystem fs, JobConf job, String name,
|
||||
final Progressable progress)
|
||||
throws IOException {
|
||||
|
||||
final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
|
||||
final Path temp = job.getLocalPath("index/_"
|
||||
+ Integer.toString(random.nextInt()));
|
||||
|
||||
LOG.info("To index into " + perm);
|
||||
|
||||
// delete old, if any
|
||||
fs.delete(perm, true);
|
||||
|
||||
final IndexConfiguration indexConf = new IndexConfiguration();
|
||||
String content = job.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
|
||||
String analyzerName = indexConf.getAnalyzerName();
|
||||
Analyzer analyzer;
|
||||
try {
|
||||
Class<?> analyzerClass = Class.forName(analyzerName);
|
||||
analyzer = (Analyzer) analyzerClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating an analyzer object "
|
||||
+ analyzerName);
|
||||
}
|
||||
|
||||
// build locally first
|
||||
final IndexWriter writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp)
|
||||
.toString())), analyzer, true, IndexWriter.MaxFieldLength.LIMITED);
|
||||
|
||||
// no delete, so no need for maxBufferedDeleteTerms
|
||||
writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
|
||||
writer.setMaxFieldLength(indexConf.getMaxFieldLength());
|
||||
writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
|
||||
writer.setMergeFactor(indexConf.getMergeFactor());
|
||||
String similarityName = indexConf.getSimilarityName();
|
||||
if (similarityName != null) {
|
||||
try {
|
||||
Class<?> similarityClass = Class.forName(similarityName);
|
||||
Similarity similarity = (Similarity) similarityClass.newInstance();
|
||||
writer.setSimilarity(similarity);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating a similarty object "
|
||||
+ similarityName);
|
||||
}
|
||||
}
|
||||
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
|
||||
|
||||
return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
|
||||
boolean closed;
|
||||
private long docCount = 0;
|
||||
|
||||
public void write(ImmutableBytesWritable key,
|
||||
LuceneDocumentWrapper value)
|
||||
throws IOException {
|
||||
// unwrap and index doc
|
||||
Document doc = value.get();
|
||||
writer.addDocument(doc);
|
||||
docCount++;
|
||||
progress.progress();
|
||||
}
|
||||
|
||||
public void close(final Reporter reporter) throws IOException {
|
||||
// spawn a thread to give progress heartbeats
|
||||
Thread prog = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!closed) {
|
||||
try {
|
||||
reporter.setStatus("closing");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
continue;
|
||||
} catch (Throwable e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
prog.start();
|
||||
|
||||
// optimize index
|
||||
if (indexConf.doOptimize()) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Optimizing index.");
|
||||
}
|
||||
writer.optimize();
|
||||
}
|
||||
|
||||
// close index
|
||||
writer.close();
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Done indexing " + docCount + " docs.");
|
||||
}
|
||||
|
||||
// copy to perm destination in dfs
|
||||
fs.completeLocalOutput(perm, temp);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Copy done.");
|
||||
}
|
||||
} finally {
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,108 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
|
||||
* to build a Lucene index
|
||||
*/
|
||||
@Deprecated
|
||||
public class IndexTableReduce extends MapReduceBase implements
|
||||
Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
|
||||
private IndexConfiguration indexConf;
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
super.configure(job);
|
||||
indexConf = new IndexConfiguration();
|
||||
String content = job.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Index conf: " + indexConf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
}
|
||||
|
||||
public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
|
||||
OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
|
||||
Reporter reporter)
|
||||
throws IOException {
|
||||
Document doc = null;
|
||||
while(values.hasNext()) {
|
||||
Result r = values.next();
|
||||
if (doc == null) {
|
||||
doc = new Document();
|
||||
// index and store row key, row key already UTF-8 encoded
|
||||
Field keyField = new Field(indexConf.getRowkeyName(),
|
||||
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
|
||||
Field.Store.YES, Field.Index.NOT_ANALYZED);
|
||||
keyField.setOmitNorms(true);
|
||||
doc.add(keyField);
|
||||
}
|
||||
// each column (name-value pair) is a field (name-value pair)
|
||||
for (KeyValue kv: r.list()) {
|
||||
// name is already UTF-8 encoded
|
||||
String column = Bytes.toString(KeyValue.makeColumn(kv.getFamily(),
|
||||
kv.getQualifier()));
|
||||
byte[] columnValue = kv.getValue();
|
||||
Field.Store store = indexConf.isStore(column)?
|
||||
Field.Store.YES: Field.Store.NO;
|
||||
Field.Index index = indexConf.isIndex(column)?
|
||||
(indexConf.isTokenize(column)?
|
||||
Field.Index.ANALYZED: Field.Index.NOT_ANALYZED):
|
||||
Field.Index.NO;
|
||||
|
||||
// UTF-8 encode value
|
||||
Field field = new Field(column, Bytes.toString(columnValue),
|
||||
store, index);
|
||||
field.setBoost(indexConf.getBoost(column));
|
||||
field.setOmitNorms(indexConf.isOmitNorms(column));
|
||||
|
||||
doc.add(field);
|
||||
}
|
||||
}
|
||||
output.collect(key, new LuceneDocumentWrapper(doc));
|
||||
}
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.lucene.document.Document;
|
||||
|
||||
/**
|
||||
* A utility class used to pass a lucene document from reduce to OutputFormat.
|
||||
* It doesn't really serialize/deserialize a lucene document.
|
||||
*/
|
||||
@Deprecated
|
||||
public class LuceneDocumentWrapper implements Writable {
|
||||
protected Document doc;
|
||||
|
||||
/**
|
||||
* @param doc
|
||||
*/
|
||||
public LuceneDocumentWrapper(Document doc) {
|
||||
this.doc = doc;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the document
|
||||
*/
|
||||
public Document get() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) {
|
||||
// intentionally left blank
|
||||
}
|
||||
|
||||
public void write(DataOutput out) {
|
||||
// intentionally left blank
|
||||
}
|
||||
}
|
|
@ -1,202 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Cluster;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
|
||||
/**
|
||||
* Example table column indexing class. Runs a mapreduce job to index
|
||||
* specified table columns.
|
||||
* <ul><li>Each row is modeled as a Lucene document: row key is indexed in
|
||||
* its untokenized form, column name-value pairs are Lucene field name-value
|
||||
* pairs.</li>
|
||||
* <li>A file passed on command line is used to populate an
|
||||
* {@link IndexConfiguration} which is used to set various Lucene parameters,
|
||||
* specify whether to optimize an index and which columns to index and/or
|
||||
* store, in tokenized or untokenized form, etc. For an example, see the
|
||||
* <code>createIndexConfContent</code> method in TestTableIndex
|
||||
* </li>
|
||||
* <li>The number of reduce tasks decides the number of indexes (partitions).
|
||||
* The index(es) is stored in the output path of job configuration.</li>
|
||||
* <li>The index build process is done in the reduce phase. Users can use
|
||||
* the map phase to join rows from different tables or to pre-parse/analyze
|
||||
* column content, etc.</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class BuildTableIndex {
|
||||
|
||||
private static final String USAGE = "Usage: BuildTableIndex " +
|
||||
"-r <numReduceTasks> -indexConf <iconfFile>\n" +
|
||||
"-indexDir <indexDir> -table <tableName>\n -columns <columnName1> " +
|
||||
"[<columnName2> ...]";
|
||||
|
||||
/**
|
||||
* Prints the usage message and exists the program.
|
||||
*
|
||||
* @param message The message to print first.
|
||||
*/
|
||||
private static void printUsage(String message) {
|
||||
System.err.println(message);
|
||||
System.err.println(USAGE);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new job.
|
||||
* @param conf
|
||||
*
|
||||
* @param args The command line arguments.
|
||||
* @throws IOException When reading the configuration fails.
|
||||
*/
|
||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||
throws IOException {
|
||||
if (args.length < 6) {
|
||||
printUsage("Too few arguments");
|
||||
}
|
||||
|
||||
int numReduceTasks = 1;
|
||||
String iconfFile = null;
|
||||
String indexDir = null;
|
||||
String tableName = null;
|
||||
StringBuilder columnNames = null;
|
||||
|
||||
// parse args
|
||||
for (int i = 0; i < args.length - 1; i++) {
|
||||
if ("-r".equals(args[i])) {
|
||||
numReduceTasks = Integer.parseInt(args[++i]);
|
||||
} else if ("-indexConf".equals(args[i])) {
|
||||
iconfFile = args[++i];
|
||||
} else if ("-indexDir".equals(args[i])) {
|
||||
indexDir = args[++i];
|
||||
} else if ("-table".equals(args[i])) {
|
||||
tableName = args[++i];
|
||||
} else if ("-columns".equals(args[i])) {
|
||||
columnNames = new StringBuilder(args[++i]);
|
||||
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
|
||||
columnNames.append(" ");
|
||||
columnNames.append(args[++i]);
|
||||
}
|
||||
} else {
|
||||
printUsage("Unsupported option " + args[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (indexDir == null || tableName == null || columnNames == null) {
|
||||
printUsage("Index directory, table name and at least one column must " +
|
||||
"be specified");
|
||||
}
|
||||
|
||||
if (iconfFile != null) {
|
||||
// set index configuration content from a file
|
||||
String content = readContent(iconfFile);
|
||||
IndexConfiguration iconf = new IndexConfiguration();
|
||||
// purely to validate, exception will be thrown if not valid
|
||||
iconf.addFromXML(content);
|
||||
conf.set("hbase.index.conf", content);
|
||||
}
|
||||
Cluster mrCluster = new Cluster(conf);
|
||||
Job job = Job.getInstance(mrCluster, conf);
|
||||
job.setJobName("build index for table " + tableName);
|
||||
// number of indexes to partition into
|
||||
job.setNumReduceTasks(numReduceTasks);
|
||||
Scan scan = new Scan();
|
||||
for(String columnName : columnNames.toString().split(" ")) {
|
||||
String [] fields = columnName.split(":");
|
||||
if(fields.length == 1) {
|
||||
scan.addFamily(Bytes.toBytes(fields[0]));
|
||||
} else {
|
||||
scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
|
||||
}
|
||||
}
|
||||
// use identity map (a waste, but just as an example)
|
||||
IdentityTableMapper.initJob(tableName, scan,
|
||||
IdentityTableMapper.class, job);
|
||||
// use IndexTableReduce to build a Lucene index
|
||||
job.setReducerClass(IndexTableReducer.class);
|
||||
FileOutputFormat.setOutputPath(job, new Path(indexDir));
|
||||
job.setOutputFormatClass(IndexOutputFormat.class);
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads xml file of indexing configurations. The xml format is similar to
|
||||
* hbase-default.xml and hadoop-default.xml. For an example configuration,
|
||||
* see the <code>createIndexConfContent</code> method in TestTableIndex.
|
||||
*
|
||||
* @param fileName The file to read.
|
||||
* @return XML configuration read from file.
|
||||
* @throws IOException When the XML is broken.
|
||||
*/
|
||||
private static String readContent(String fileName) throws IOException {
|
||||
File file = new File(fileName);
|
||||
int length = (int) file.length();
|
||||
if (length == 0) {
|
||||
printUsage("Index configuration file " + fileName + " does not exist");
|
||||
}
|
||||
|
||||
int bytesRead = 0;
|
||||
byte[] bytes = new byte[length];
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
|
||||
try {
|
||||
// read entire file into content
|
||||
while (bytesRead < length) {
|
||||
int read = fis.read(bytes, bytesRead, length - bytesRead);
|
||||
if (read > 0) {
|
||||
bytesRead += read;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
|
||||
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* The main entry point.
|
||||
*
|
||||
* @param args The command line arguments.
|
||||
* @throws Exception When running the job fails.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
String[] otherArgs =
|
||||
new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||
Job job = createSubmittableJob(conf, otherArgs);
|
||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,452 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.w3c.dom.Text;
|
||||
|
||||
/**
|
||||
* Configuration parameters for building a Lucene index.
|
||||
*/
|
||||
public class IndexConfiguration extends Configuration {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
|
||||
|
||||
static final String HBASE_COLUMN_NAME = "hbase.column.name";
|
||||
static final String HBASE_COLUMN_STORE = "hbase.column.store";
|
||||
static final String HBASE_COLUMN_INDEX = "hbase.column.index";
|
||||
|
||||
/**
|
||||
* Tokenize property terminology is deprecated in lucene / replaced by analyze.
|
||||
* @see #HBASE_COLUMN_ANALYZE
|
||||
* @deprecated
|
||||
*/
|
||||
static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
|
||||
static final String HBASE_COLUMN_ANALYZE = "hbase.column.analyze";
|
||||
|
||||
static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
|
||||
static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
|
||||
static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
|
||||
static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
|
||||
"hbase.index.max.buffered.docs";
|
||||
static final String HBASE_INDEX_MAX_BUFFERED_DELS =
|
||||
"hbase.index.max.buffered.dels";
|
||||
static final String HBASE_INDEX_MAX_FIELD_LENGTH =
|
||||
"hbase.index.max.field.length";
|
||||
static final String HBASE_INDEX_MAX_MERGE_DOCS =
|
||||
"hbase.index.max.merge.docs";
|
||||
static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
|
||||
// double ramBufferSizeMB;
|
||||
static final String HBASE_INDEX_SIMILARITY_NAME =
|
||||
"hbase.index.similarity.name";
|
||||
static final String HBASE_INDEX_USE_COMPOUND_FILE =
|
||||
"hbase.index.use.compound.file";
|
||||
static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
|
||||
|
||||
public static class ColumnConf extends Properties {
|
||||
|
||||
private static final long serialVersionUID = 7419012290580607821L;
|
||||
|
||||
boolean getBoolean(String name, boolean defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if ("true".equals(valueString))
|
||||
return true;
|
||||
else if ("false".equals(valueString))
|
||||
return false;
|
||||
else
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
void setBoolean(String name, boolean value) {
|
||||
setProperty(name, Boolean.toString(value));
|
||||
}
|
||||
|
||||
float getFloat(String name, float defaultValue) {
|
||||
String valueString = getProperty(name);
|
||||
if (valueString == null)
|
||||
return defaultValue;
|
||||
try {
|
||||
return Float.parseFloat(valueString);
|
||||
} catch (NumberFormatException e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
void setFloat(String name, float value) {
|
||||
setProperty(name, Float.toString(value));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, ColumnConf> columnMap =
|
||||
new ConcurrentHashMap<String, ColumnConf>();
|
||||
|
||||
public Iterator<String> columnNameIterator() {
|
||||
return columnMap.keySet().iterator();
|
||||
}
|
||||
|
||||
public boolean isIndex(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
|
||||
}
|
||||
|
||||
public void setIndex(String columnName, boolean index) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
|
||||
}
|
||||
|
||||
public boolean isStore(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
|
||||
}
|
||||
|
||||
public void setStore(String columnName, boolean store) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* See {@link #isAnalyze(String)} for replacement.
|
||||
* @param columnName
|
||||
* @return true, if column needs to be tokenized
|
||||
*/
|
||||
public boolean isTokenize(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* See {@link #setAnalyze(String, boolean)} for replacement.
|
||||
* @param columnName
|
||||
* @param tokenize
|
||||
*/
|
||||
public void setTokenize(String columnName, boolean tokenize) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
|
||||
}
|
||||
|
||||
public boolean isAnalyze(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_ANALYZE, true);
|
||||
}
|
||||
|
||||
public void setAnalyze(String columnName, boolean analyze) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_ANALYZE, analyze);
|
||||
}
|
||||
|
||||
public float getBoost(String columnName) {
|
||||
return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
|
||||
}
|
||||
|
||||
public void setBoost(String columnName, float boost) {
|
||||
getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
|
||||
}
|
||||
|
||||
public boolean isOmitNorms(String columnName) {
|
||||
return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
|
||||
}
|
||||
|
||||
public void setOmitNorms(String columnName, boolean omitNorms) {
|
||||
getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
|
||||
}
|
||||
|
||||
private ColumnConf getColumn(String columnName) {
|
||||
ColumnConf column = columnMap.get(columnName);
|
||||
if (column == null) {
|
||||
column = new ColumnConf();
|
||||
columnMap.put(columnName, column);
|
||||
}
|
||||
return column;
|
||||
}
|
||||
|
||||
public String getAnalyzerName() {
|
||||
return get(HBASE_INDEX_ANALYZER_NAME,
|
||||
StandardAnalyzer.class.getName());
|
||||
}
|
||||
|
||||
public void setAnalyzerName(String analyzerName) {
|
||||
set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDeleteTerms() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
|
||||
}
|
||||
|
||||
public int getMaxBufferedDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
|
||||
}
|
||||
|
||||
public void setMaxBufferedDocs(int maxBufferedDocs) {
|
||||
setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
|
||||
}
|
||||
|
||||
public int getMaxFieldLength() {
|
||||
return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxFieldLength(int maxFieldLength) {
|
||||
setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
|
||||
}
|
||||
|
||||
public int getMaxMergeDocs() {
|
||||
return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public void setMaxMergeDocs(int maxMergeDocs) {
|
||||
setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
|
||||
}
|
||||
|
||||
public int getMergeFactor() {
|
||||
return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
|
||||
}
|
||||
|
||||
public void setMergeFactor(int mergeFactor) {
|
||||
setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
|
||||
}
|
||||
|
||||
public String getRowkeyName() {
|
||||
return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
|
||||
}
|
||||
|
||||
public void setRowkeyName(String rowkeyName) {
|
||||
set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
|
||||
}
|
||||
|
||||
public String getSimilarityName() {
|
||||
return get(HBASE_INDEX_SIMILARITY_NAME, null);
|
||||
}
|
||||
|
||||
public void setSimilarityName(String similarityName) {
|
||||
set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
|
||||
}
|
||||
|
||||
public boolean isUseCompoundFile() {
|
||||
return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
|
||||
}
|
||||
|
||||
public void setUseCompoundFile(boolean useCompoundFile) {
|
||||
setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
|
||||
}
|
||||
|
||||
public boolean doOptimize() {
|
||||
return getBoolean(HBASE_INDEX_OPTIMIZE, true);
|
||||
}
|
||||
|
||||
public void setDoOptimize(boolean doOptimize) {
|
||||
setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
|
||||
}
|
||||
|
||||
public void addFromXML(String content) {
|
||||
try {
|
||||
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
|
||||
.newDocumentBuilder();
|
||||
|
||||
Document doc = builder
|
||||
.parse(new ByteArrayInputStream(content.getBytes()));
|
||||
|
||||
Element root = doc.getDocumentElement();
|
||||
if (!"configuration".equals(root.getTagName())) {
|
||||
LOG.fatal("bad conf file: top-level element not <configuration>");
|
||||
}
|
||||
|
||||
NodeList props = root.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, null);
|
||||
} else if ("column".equals(prop.getTagName())) {
|
||||
columnConfFromXML(prop);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element neither <property> nor <column>");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("error parsing conf content: " + e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void propertyFromXML(Element prop, Properties properties) {
|
||||
NodeList fields = prop.getChildNodes();
|
||||
String attr = null;
|
||||
String value = null;
|
||||
|
||||
for (int j = 0; j < fields.getLength(); j++) {
|
||||
Node fieldNode = fields.item(j);
|
||||
if (!(fieldNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element field = (Element) fieldNode;
|
||||
if ("name".equals(field.getTagName())) {
|
||||
attr = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
|
||||
value = ((Text) field.getFirstChild()).getData();
|
||||
}
|
||||
}
|
||||
|
||||
if (attr != null && value != null) {
|
||||
if (properties == null) {
|
||||
set(attr, value);
|
||||
} else {
|
||||
properties.setProperty(attr, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void columnConfFromXML(Element column) {
|
||||
ColumnConf columnConf = new ColumnConf();
|
||||
NodeList props = column.getChildNodes();
|
||||
for (int i = 0; i < props.getLength(); i++) {
|
||||
Node propNode = props.item(i);
|
||||
if (!(propNode instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Element prop = (Element) propNode;
|
||||
if ("property".equals(prop.getTagName())) {
|
||||
propertyFromXML(prop, columnConf);
|
||||
} else {
|
||||
LOG.warn("bad conf content: element not <property>");
|
||||
}
|
||||
}
|
||||
|
||||
if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
|
||||
columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
|
||||
} else {
|
||||
LOG.warn("bad column conf: name not specified");
|
||||
}
|
||||
}
|
||||
|
||||
public void write(OutputStream out) {
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(out);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Document writeDocument() {
|
||||
Iterator<Map.Entry<String, String>> iter = iterator();
|
||||
try {
|
||||
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
|
||||
.newDocument();
|
||||
Element conf = doc.createElement("configuration");
|
||||
doc.appendChild(conf);
|
||||
conf.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
Map.Entry<String, String> entry;
|
||||
while (iter.hasNext()) {
|
||||
entry = iter.next();
|
||||
String name = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
writeProperty(doc, conf, name, value);
|
||||
}
|
||||
|
||||
Iterator<String> columnIter = columnNameIterator();
|
||||
while (columnIter.hasNext()) {
|
||||
writeColumn(doc, conf, columnIter.next());
|
||||
}
|
||||
|
||||
return doc;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeProperty(Document doc, Element parent, String name,
|
||||
String value) {
|
||||
Element propNode = doc.createElement("property");
|
||||
parent.appendChild(propNode);
|
||||
|
||||
Element nameNode = doc.createElement("name");
|
||||
nameNode.appendChild(doc.createTextNode(name));
|
||||
propNode.appendChild(nameNode);
|
||||
|
||||
Element valueNode = doc.createElement("value");
|
||||
valueNode.appendChild(doc.createTextNode(value));
|
||||
propNode.appendChild(valueNode);
|
||||
|
||||
parent.appendChild(doc.createTextNode("\n"));
|
||||
}
|
||||
|
||||
private void writeColumn(Document doc, Element parent, String columnName) {
|
||||
Element column = doc.createElement("column");
|
||||
parent.appendChild(column);
|
||||
column.appendChild(doc.createTextNode("\n"));
|
||||
|
||||
ColumnConf columnConf = getColumn(columnName);
|
||||
for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
|
||||
if (entry.getKey() instanceof String
|
||||
&& entry.getValue() instanceof String) {
|
||||
writeProperty(doc, column, (String) entry.getKey(), (String) entry
|
||||
.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringWriter writer = new StringWriter();
|
||||
try {
|
||||
Document doc = writeDocument();
|
||||
DOMSource source = new DOMSource(doc);
|
||||
StreamResult result = new StreamResult(writer);
|
||||
TransformerFactory transFactory = TransformerFactory.newInstance();
|
||||
Transformer transformer = transFactory.newTransformer();
|
||||
transformer.transform(source, result);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
|
||||
import org.apache.lucene.search.Similarity;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
|
||||
/**
|
||||
* Create a local index, unwrap Lucene documents created by reduce, add them to
|
||||
* the index, and copy the index to the destination.
|
||||
*/
|
||||
public class IndexOutputFormat
|
||||
extends FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
|
||||
|
||||
/** Random generator. */
|
||||
private Random random = new Random();
|
||||
|
||||
/**
|
||||
* Returns the record writer.
|
||||
*
|
||||
* @param context The current task context.
|
||||
* @return The record writer.
|
||||
* @throws IOException When there is an issue with the writer.
|
||||
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||
*/
|
||||
@Override
|
||||
public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
|
||||
getRecordWriter(TaskAttemptContext context)
|
||||
throws IOException {
|
||||
|
||||
final Path perm = new Path(FileOutputFormat.getOutputPath(context),
|
||||
FileOutputFormat.getUniqueFile(context, "part", ""));
|
||||
// null for "dirsProp" means no predefined directories
|
||||
final Path temp = context.getConfiguration().getLocalPath(
|
||||
"mapred.local.dir", "index/_" + Integer.toString(random.nextInt()));
|
||||
|
||||
LOG.info("To index into " + perm);
|
||||
FileSystem fs = FileSystem.get(context.getConfiguration());
|
||||
// delete old, if any
|
||||
fs.delete(perm, true);
|
||||
|
||||
final IndexConfiguration indexConf = new IndexConfiguration();
|
||||
String content = context.getConfiguration().get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
|
||||
String analyzerName = indexConf.getAnalyzerName();
|
||||
Analyzer analyzer;
|
||||
try {
|
||||
Class<?> analyzerClass = Class.forName(analyzerName);
|
||||
analyzer = (Analyzer) analyzerClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating an analyzer object "
|
||||
+ analyzerName);
|
||||
}
|
||||
|
||||
// build locally first
|
||||
final IndexWriter writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp)
|
||||
.toString())), analyzer, true, MaxFieldLength.LIMITED);
|
||||
|
||||
// no delete, so no need for maxBufferedDeleteTerms
|
||||
writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
|
||||
writer.setMaxFieldLength(indexConf.getMaxFieldLength());
|
||||
writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
|
||||
writer.setMergeFactor(indexConf.getMergeFactor());
|
||||
String similarityName = indexConf.getSimilarityName();
|
||||
if (similarityName != null) {
|
||||
try {
|
||||
Similarity similarity = Class.forName(similarityName).asSubclass(Similarity.class).newInstance();
|
||||
writer.setSimilarity(similarity);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error in creating a similarity object "
|
||||
+ similarityName);
|
||||
}
|
||||
}
|
||||
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
|
||||
return new IndexRecordWriter(context, fs, writer, indexConf, perm, temp);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,161 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
|
||||
/**
|
||||
* Writes the records into a Lucene index writer.
|
||||
*/
|
||||
public class IndexRecordWriter
|
||||
extends RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(IndexRecordWriter.class);
|
||||
|
||||
private long docCount = 0;
|
||||
private TaskAttemptContext context = null;
|
||||
private FileSystem fs = null;
|
||||
private IndexWriter writer = null;
|
||||
private IndexConfiguration indexConf = null;
|
||||
private Path perm = null;
|
||||
private Path temp = null;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param context The task context.
|
||||
* @param fs The file system.
|
||||
* @param writer The index writer.
|
||||
* @param indexConf The index configuration.
|
||||
* @param perm The permanent path in the DFS.
|
||||
* @param temp The temporary local path.
|
||||
*/
|
||||
public IndexRecordWriter(TaskAttemptContext context, FileSystem fs,
|
||||
IndexWriter writer, IndexConfiguration indexConf, Path perm, Path temp) {
|
||||
this.context = context;
|
||||
this.fs = fs;
|
||||
this.writer = writer;
|
||||
this.indexConf = indexConf;
|
||||
this.perm = perm;
|
||||
this.temp = temp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the record into an index.
|
||||
*
|
||||
* @param key The current key.
|
||||
* @param value The current value.
|
||||
* @throws IOException When the index is faulty.
|
||||
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public void write(ImmutableBytesWritable key, LuceneDocumentWrapper value)
|
||||
throws IOException {
|
||||
// unwrap and index doc
|
||||
Document doc = value.get();
|
||||
writer.addDocument(doc);
|
||||
docCount++;
|
||||
context.progress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the writer.
|
||||
*
|
||||
* @param context The current context.
|
||||
* @throws IOException When closing the writer fails.
|
||||
* @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||
*/
|
||||
@Override
|
||||
public void close(TaskAttemptContext context) throws IOException {
|
||||
// spawn a thread to give progress heartbeats
|
||||
HeartbeatsThread prog = new HeartbeatsThread();
|
||||
try {
|
||||
prog.start();
|
||||
|
||||
// optimize index
|
||||
if (indexConf.doOptimize()) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Optimizing index.");
|
||||
}
|
||||
writer.optimize();
|
||||
}
|
||||
|
||||
// close index
|
||||
writer.close();
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Done indexing " + docCount + " docs.");
|
||||
}
|
||||
|
||||
// copy to perm destination in dfs
|
||||
fs.completeLocalOutput(perm, temp);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Copy done.");
|
||||
}
|
||||
} finally {
|
||||
prog.setClosed();
|
||||
}
|
||||
}
|
||||
|
||||
class HeartbeatsThread extends Thread {
|
||||
|
||||
/** Flag to track when to finish. */
|
||||
private boolean closed = false;
|
||||
|
||||
/**
|
||||
* Runs the thread. Sending heart beats to the framework.
|
||||
*
|
||||
* @see java.lang.Runnable#run()
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
context.setStatus("Closing");
|
||||
while (!closed) {
|
||||
try {
|
||||
context.progress();
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
continue;
|
||||
} catch (Throwable e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Switches the flag.
|
||||
*/
|
||||
public void setClosed() {
|
||||
closed = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
|
||||
* to build a Lucene index
|
||||
*/
|
||||
public class IndexTableReducer
|
||||
extends Reducer<ImmutableBytesWritable, Result,
|
||||
ImmutableBytesWritable, LuceneDocumentWrapper>
|
||||
implements Configurable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(IndexTableReducer.class);
|
||||
|
||||
private IndexConfiguration indexConf;
|
||||
private Configuration conf = null;
|
||||
|
||||
/**
|
||||
* Writes each given record, consisting of the key and the given values, to
|
||||
* the index.
|
||||
*
|
||||
* @param key The current row key.
|
||||
* @param values The values for the given row.
|
||||
* @param context The context of the reduce.
|
||||
* @throws IOException When writing the record fails.
|
||||
* @throws InterruptedException When the job gets interrupted.
|
||||
*/
|
||||
@Override
|
||||
public void reduce(ImmutableBytesWritable key, Iterable<Result> values,
|
||||
Context context)
|
||||
throws IOException, InterruptedException {
|
||||
Document doc = null;
|
||||
for (Result r : values) {
|
||||
if (doc == null) {
|
||||
doc = new Document();
|
||||
// index and store row key, row key already UTF-8 encoded
|
||||
Field keyField = new Field(indexConf.getRowkeyName(),
|
||||
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
|
||||
Field.Store.YES, Field.Index.NOT_ANALYZED);
|
||||
keyField.setOmitNorms(true);
|
||||
doc.add(keyField);
|
||||
}
|
||||
// each column (name-value pair) is a field (name-value pair)
|
||||
for (KeyValue kv: r.list()) {
|
||||
// name is already UTF-8 encoded
|
||||
String column = Bytes.toString(KeyValue.makeColumn(kv.getFamily(),
|
||||
kv.getQualifier()));
|
||||
byte[] columnValue = kv.getValue();
|
||||
Field.Store store = indexConf.isStore(column)?
|
||||
Field.Store.YES: Field.Store.NO;
|
||||
Field.Index index = indexConf.isIndex(column)?
|
||||
(indexConf.isTokenize(column)?
|
||||
Field.Index.ANALYZED: Field.Index.NOT_ANALYZED):
|
||||
Field.Index.NO;
|
||||
|
||||
// UTF-8 encode value
|
||||
Field field = new Field(column, Bytes.toString(columnValue),
|
||||
store, index);
|
||||
field.setBoost(indexConf.getBoost(column));
|
||||
field.setOmitNorms(indexConf.isOmitNorms(column));
|
||||
|
||||
doc.add(field);
|
||||
}
|
||||
}
|
||||
context.write(key, new LuceneDocumentWrapper(doc));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current configuration.
|
||||
*
|
||||
* @return The current configuration.
|
||||
* @see org.apache.hadoop.conf.Configurable#getConf()
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the configuration. This is used to set up the index configuration.
|
||||
*
|
||||
* @param configuration The configuration to set.
|
||||
* @see org.apache.hadoop.conf.Configurable#setConf(
|
||||
* org.apache.hadoop.conf.Configuration)
|
||||
*/
|
||||
@Override
|
||||
public void setConf(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
indexConf = new IndexConfiguration();
|
||||
String content = conf.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Index conf: " + indexConf);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.lucene.document.Document;
|
||||
|
||||
/**
|
||||
* A utility class used to pass a lucene document from reduce to OutputFormat.
|
||||
* It doesn't really serialize/deserialize a lucene document.
|
||||
*/
|
||||
public class LuceneDocumentWrapper implements Writable {
|
||||
|
||||
/** The document to add to the index. */
|
||||
protected Document doc;
|
||||
|
||||
/**
|
||||
* @param doc
|
||||
*/
|
||||
public LuceneDocumentWrapper(Document doc) {
|
||||
this.doc = doc;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the document
|
||||
*/
|
||||
public Document get() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) {
|
||||
// intentionally left blank
|
||||
}
|
||||
|
||||
public void write(DataOutput out) {
|
||||
// intentionally left blank
|
||||
}
|
||||
}
|
|
@ -1,307 +0,0 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.TestSuite;
|
||||
import junit.textui.TestRunner;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MultiRegionTable;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MultiSearcher;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.Searchable;
|
||||
import org.apache.lucene.search.Searcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
|
||||
/**
|
||||
* Test Map/Reduce job to build index over HBase table
|
||||
*/
|
||||
public class DisabledBecauseVariableSubstTooLargeExceptionTestTableIndex extends MultiRegionTable {
|
||||
private static final Log LOG = LogFactory.getLog(DisabledBecauseVariableSubstTooLargeExceptionTestTableIndex.class);
|
||||
|
||||
static final byte[] TABLE_NAME = Bytes.toBytes("moretest");
|
||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
||||
static final String ROWKEY_NAME = "key";
|
||||
static final String INDEX_DIR = "testindex";
|
||||
|
||||
static final Random rand = new Random();
|
||||
|
||||
/** default constructor */
|
||||
public DisabledBecauseVariableSubstTooLargeExceptionTestTableIndex() {
|
||||
super(Bytes.toString(INPUT_FAMILY));
|
||||
desc = new HTableDescriptor(TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
|
||||
desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
if (conf != null) {
|
||||
FileUtil.fullyDelete(new File(conf.get("hadoop.tmp.dir")));
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HBase map/reduce
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void testTableIndex()
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
boolean printResults = false;
|
||||
if (printResults) {
|
||||
LOG.info("Print table contents before map/reduce");
|
||||
}
|
||||
scanTable(printResults);
|
||||
|
||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||
|
||||
// set configuration parameter for index build
|
||||
conf.set("hbase.index.conf", createIndexConfContent());
|
||||
|
||||
try {
|
||||
Job job = new Job(conf, "index column contents");
|
||||
// number of indexes to partition into
|
||||
job.setNumReduceTasks(1);
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(INPUT_FAMILY);
|
||||
// use identity map (a waste, but just as an example)
|
||||
IdentityTableMapper.initJob(Bytes.toString(TABLE_NAME), scan,
|
||||
IdentityTableMapper.class, job);
|
||||
// use IndexTableReduce to build a Lucene index
|
||||
job.setReducerClass(IndexTableReducer.class);
|
||||
job.setOutputFormatClass(IndexOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(job, new Path(INDEX_DIR));
|
||||
job.waitForCompletion(true);
|
||||
} finally {
|
||||
mrCluster.shutdown();
|
||||
}
|
||||
|
||||
if (printResults) {
|
||||
LOG.info("Print table contents after map/reduce");
|
||||
}
|
||||
scanTable(printResults);
|
||||
|
||||
// verify index results
|
||||
verify();
|
||||
}
|
||||
|
||||
private String createIndexConfContent() {
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
buffer.append("<configuration><column><property>" +
|
||||
"<name>hbase.column.name</name><value>" + Bytes.toString(INPUT_FAMILY) +
|
||||
"</value></property>");
|
||||
buffer.append("<property><name>hbase.column.store</name> " +
|
||||
"<value>true</value></property>");
|
||||
buffer.append("<property><name>hbase.column.index</name>" +
|
||||
"<value>true</value></property>");
|
||||
buffer.append("<property><name>hbase.column.tokenize</name>" +
|
||||
"<value>false</value></property>");
|
||||
buffer.append("<property><name>hbase.column.boost</name>" +
|
||||
"<value>3</value></property>");
|
||||
buffer.append("<property><name>hbase.column.omit.norms</name>" +
|
||||
"<value>false</value></property></column>");
|
||||
buffer.append("<property><name>hbase.index.rowkey.name</name><value>" +
|
||||
ROWKEY_NAME + "</value></property>");
|
||||
buffer.append("<property><name>hbase.index.max.buffered.docs</name>" +
|
||||
"<value>500</value></property>");
|
||||
buffer.append("<property><name>hbase.index.max.field.length</name>" +
|
||||
"<value>10000</value></property>");
|
||||
buffer.append("<property><name>hbase.index.merge.factor</name>" +
|
||||
"<value>10</value></property>");
|
||||
buffer.append("<property><name>hbase.index.use.compound.file</name>" +
|
||||
"<value>true</value></property>");
|
||||
buffer.append("<property><name>hbase.index.optimize</name>" +
|
||||
"<value>true</value></property></configuration>");
|
||||
|
||||
IndexConfiguration c = new IndexConfiguration();
|
||||
c.addFromXML(buffer.toString());
|
||||
return c.toString();
|
||||
}
|
||||
|
||||
private void scanTable(boolean printResults)
|
||||
throws IOException {
|
||||
HTable table = new HTable(conf, TABLE_NAME);
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(INPUT_FAMILY);
|
||||
scan.addFamily(OUTPUT_FAMILY);
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
try {
|
||||
for (Result r : scanner) {
|
||||
if (printResults) {
|
||||
LOG.info("row: " + Bytes.toStringBinary(r.getRow()));
|
||||
}
|
||||
for (KeyValue kv : r.list()) {
|
||||
if (printResults) {
|
||||
LOG.info(" column: " + Bytes.toStringBinary(kv.getKey()) + " value: "
|
||||
+ Bytes.toStringBinary(kv.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void verify() throws IOException {
|
||||
// Force a cache flush for every online region to ensure that when the
|
||||
// scanner takes its snapshot, all the updates have made it into the cache.
|
||||
for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer().
|
||||
getOnlineRegions()) {
|
||||
HRegionIncommon region = new HRegionIncommon(r);
|
||||
region.flushcache();
|
||||
}
|
||||
|
||||
Path localDir = new Path(getUnitTestdir(getName()), "index_" +
|
||||
Integer.toString(rand.nextInt()));
|
||||
this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
|
||||
FileSystem localfs = FileSystem.getLocal(conf);
|
||||
FileStatus [] indexDirs = localfs.listStatus(localDir);
|
||||
Searcher searcher = null;
|
||||
ResultScanner scanner = null;
|
||||
try {
|
||||
if (indexDirs.length == 1) {
|
||||
searcher = new IndexSearcher(FSDirectory.open(new File(indexDirs[0].getPath().
|
||||
toUri())));
|
||||
} else if (indexDirs.length > 1) {
|
||||
Searchable[] searchers = new Searchable[indexDirs.length];
|
||||
for (int i = 0; i < indexDirs.length; i++) {
|
||||
searchers[i] = new IndexSearcher(FSDirectory.open(new File(indexDirs[i].getPath().
|
||||
toUri())));
|
||||
}
|
||||
searcher = new MultiSearcher(searchers);
|
||||
} else {
|
||||
throw new IOException("no index directory found");
|
||||
}
|
||||
|
||||
HTable table = new HTable(conf, TABLE_NAME);
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(INPUT_FAMILY);
|
||||
scan.addFamily(OUTPUT_FAMILY);
|
||||
scanner = table.getScanner(scan);
|
||||
|
||||
IndexConfiguration indexConf = new IndexConfiguration();
|
||||
String content = conf.get("hbase.index.conf");
|
||||
if (content != null) {
|
||||
indexConf.addFromXML(content);
|
||||
}
|
||||
String rowkeyName = indexConf.getRowkeyName();
|
||||
|
||||
int count = 0;
|
||||
for (Result r : scanner) {
|
||||
String value = Bytes.toString(r.getRow());
|
||||
Term term = new Term(rowkeyName, value);
|
||||
CountCollector collector = new CountCollector();
|
||||
searcher.search(new TermQuery(term), collector);
|
||||
int hitCount = collector.getCount();
|
||||
assertEquals("check row " + value, 1, hitCount);
|
||||
count++;
|
||||
}
|
||||
LOG.debug("Searcher.maxDoc: " + searcher.maxDoc());
|
||||
LOG.debug("IndexReader.numDocs: " + ((IndexSearcher)searcher).getIndexReader().numDocs());
|
||||
int maxDoc = ((IndexSearcher)searcher).getIndexReader().numDocs();
|
||||
assertEquals("check number of rows", maxDoc, count);
|
||||
} finally {
|
||||
if (null != searcher)
|
||||
searcher.close();
|
||||
if (null != scanner)
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collector that retrieves the count of the documents.
|
||||
*
|
||||
* @author Kay Kay
|
||||
*
|
||||
*/
|
||||
public static class CountCollector extends Collector {
|
||||
|
||||
private int count;
|
||||
|
||||
public CountCollector() {
|
||||
count = 0;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
//Make this accept docs out of order as some collectors can be efficient that way.
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
++count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(IndexReader reader, int docBase)
|
||||
throws IOException {
|
||||
//Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
//Nothing to do with scorer.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args unused
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
TestRunner.run(new TestSuite(DisabledBecauseVariableSubstTooLargeExceptionTestTableIndex.class));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue