HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x AND remove bit where we can make < maps than regions
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@789847 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
94165dbc02
commit
1fdd85b577
|
@ -0,0 +1,179 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract grouping columns from input record.
|
||||||
|
*/
|
||||||
|
public class GroupingTableMapper
|
||||||
|
extends TableMapper<ImmutableBytesWritable,Result> implements Configurable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JobConf parameter to specify the columns used to produce the key passed to
|
||||||
|
* collect from the map phase.
|
||||||
|
*/
|
||||||
|
public static final String GROUP_COLUMNS =
|
||||||
|
"hbase.mapred.groupingtablemap.columns";
|
||||||
|
|
||||||
|
/** The grouping columns. */
|
||||||
|
protected byte [][] columns;
|
||||||
|
/** The current configuration. */
|
||||||
|
private Configuration conf = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use this before submitting a TableMap job. It will appropriately set up
|
||||||
|
* the job.
|
||||||
|
*
|
||||||
|
* @param table The table to be processed.
|
||||||
|
* @param scan The scan with the columns etc.
|
||||||
|
* @param groupColumns A space separated list of columns used to form the
|
||||||
|
* key used in collect.
|
||||||
|
* @param mapper The mapper class.
|
||||||
|
* @param job The current job.
|
||||||
|
* @throws IOException When setting up the job fails.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static void initJob(String table, Scan scan, String groupColumns,
|
||||||
|
Class<? extends TableMapper> mapper, Job job) throws IOException {
|
||||||
|
TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
|
||||||
|
ImmutableBytesWritable.class, Result.class, job);
|
||||||
|
job.getConfiguration().set(GROUP_COLUMNS, groupColumns);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the grouping columns from value to construct a new key. Pass the
|
||||||
|
* new key and value to reduce. If any of the grouping columns are not found
|
||||||
|
* in the value, the record is skipped.
|
||||||
|
*
|
||||||
|
* @param key The current key.
|
||||||
|
* @param value The current value.
|
||||||
|
* @param context The current context.
|
||||||
|
* @throws IOException When writing the record fails.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void map(ImmutableBytesWritable key, Result value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
byte[][] keyVals = extractKeyValues(value);
|
||||||
|
if(keyVals != null) {
|
||||||
|
ImmutableBytesWritable tKey = createGroupKey(keyVals);
|
||||||
|
context.write(tKey, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract columns values from the current record. This method returns
|
||||||
|
* null if any of the columns are not found.
|
||||||
|
* <p>
|
||||||
|
* Override this method if you want to deal with nulls differently.
|
||||||
|
*
|
||||||
|
* @param r The current values.
|
||||||
|
* @return Array of byte values.
|
||||||
|
*/
|
||||||
|
protected byte[][] extractKeyValues(Result r) {
|
||||||
|
byte[][] keyVals = null;
|
||||||
|
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
|
||||||
|
int numCols = columns.length;
|
||||||
|
if (numCols > 0) {
|
||||||
|
for (KeyValue value: r.list()) {
|
||||||
|
byte [] column = value.getColumn();
|
||||||
|
for (int i = 0; i < numCols; i++) {
|
||||||
|
if (Bytes.equals(column, columns[i])) {
|
||||||
|
foundList.add(value.getValue());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(foundList.size() == numCols) {
|
||||||
|
keyVals = foundList.toArray(new byte[numCols][]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keyVals;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a key by concatenating multiple column values.
|
||||||
|
* <p>
|
||||||
|
* Override this function in order to produce different types of keys.
|
||||||
|
*
|
||||||
|
* @param vals The current key/values.
|
||||||
|
* @return A key generated by concatenating multiple column values.
|
||||||
|
*/
|
||||||
|
protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
|
||||||
|
if(vals == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int i = 0; i < vals.length; i++) {
|
||||||
|
if(i > 0) {
|
||||||
|
sb.append(" ");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
|
||||||
|
} catch (UnsupportedEncodingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current configuration.
|
||||||
|
*
|
||||||
|
* @return The current configuration.
|
||||||
|
* @see org.apache.hadoop.conf.Configurable#getConf()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the configuration. This is used to set up the grouping details.
|
||||||
|
*
|
||||||
|
* @param configuration The configuration to set.
|
||||||
|
* @see org.apache.hadoop.conf.Configurable#setConf(
|
||||||
|
* org.apache.hadoop.conf.Configuration)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration configuration) {
|
||||||
|
this.conf = configuration;
|
||||||
|
String[] cols = conf.get(GROUP_COLUMNS, "").split(" ");
|
||||||
|
columns = new byte[cols.length][];
|
||||||
|
for(int i = 0; i < cols.length; i++) {
|
||||||
|
columns[i] = Bytes.toBytes(cols[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass the given key and record as-is to the reduce phase.
|
||||||
|
*/
|
||||||
|
public class IdentityTableMapper
|
||||||
|
extends TableMapper<ImmutableBytesWritable, Result> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use this before submitting a TableMap job. It will appropriately set up
|
||||||
|
* the job.
|
||||||
|
*
|
||||||
|
* @param table The table name.
|
||||||
|
* @param scan The scan with the columns to scan.
|
||||||
|
* @param mapper The mapper class.
|
||||||
|
* @param job The job configuration.
|
||||||
|
* @throws IOException When setting up the job fails.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static void initJob(String table, Scan scan,
|
||||||
|
Class<? extends TableMapper> mapper, Job job) throws IOException {
|
||||||
|
TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
|
||||||
|
ImmutableBytesWritable.class, Result.class, job);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass the key, value to reduce.
|
||||||
|
*
|
||||||
|
* @param key The current key.
|
||||||
|
* @param value The current value.
|
||||||
|
* @param context The current context.
|
||||||
|
* @throws IOException When writing the record fails.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
*/
|
||||||
|
public void map(ImmutableBytesWritable key, Result value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
context.write(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convenience class that simply writes each key, record pair to the configured
|
||||||
|
* HBase table.
|
||||||
|
*/
|
||||||
|
public class IdentityTableReducer
|
||||||
|
extends TableReducer<ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes each given record, consisting of the key and the given values, to
|
||||||
|
* the HBase table.
|
||||||
|
*
|
||||||
|
* @param key The current row key.
|
||||||
|
* @param values The values for the given row.
|
||||||
|
* @param context The context of the reduce.
|
||||||
|
* @throws IOException When writing the record fails.
|
||||||
|
* @throws InterruptedException When the job gets interrupted.
|
||||||
|
*/
|
||||||
|
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
|
||||||
|
Context context) throws IOException, InterruptedException {
|
||||||
|
while(values.hasNext()) {
|
||||||
|
context.write(key, values.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.IndexWriter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the records into a Lucene index writer.
|
||||||
|
*/
|
||||||
|
public class IndexRecordWriter
|
||||||
|
extends RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper> {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(IndexRecordWriter.class);
|
||||||
|
|
||||||
|
private long docCount = 0;
|
||||||
|
private TaskAttemptContext context = null;
|
||||||
|
private FileSystem fs = null;
|
||||||
|
private IndexWriter writer = null;
|
||||||
|
private IndexConfiguration indexConf = null;
|
||||||
|
private Path perm = null;
|
||||||
|
private Path temp = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* @param context The task context.
|
||||||
|
* @param fs The file system.
|
||||||
|
* @param writer The index writer.
|
||||||
|
* @param indexConf The index configuration.
|
||||||
|
* @param perm The permanent path in the DFS.
|
||||||
|
* @param temp The temporary local path.
|
||||||
|
*/
|
||||||
|
public IndexRecordWriter(TaskAttemptContext context, FileSystem fs,
|
||||||
|
IndexWriter writer, IndexConfiguration indexConf, Path perm, Path temp) {
|
||||||
|
this.context = context;
|
||||||
|
this.fs = fs;
|
||||||
|
this.writer = writer;
|
||||||
|
this.indexConf = indexConf;
|
||||||
|
this.perm = perm;
|
||||||
|
this.temp = temp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the record into an index.
|
||||||
|
*
|
||||||
|
* @param key The current key.
|
||||||
|
* @param value The current value.
|
||||||
|
* @throws IOException When the index is faulty.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void write(ImmutableBytesWritable key, LuceneDocumentWrapper value)
|
||||||
|
throws IOException {
|
||||||
|
// unwrap and index doc
|
||||||
|
Document doc = value.get();
|
||||||
|
writer.addDocument(doc);
|
||||||
|
docCount++;
|
||||||
|
context.progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the writer.
|
||||||
|
*
|
||||||
|
* @param context The current context.
|
||||||
|
* @throws IOException When closing the writer fails.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close(TaskAttemptContext context) throws IOException {
|
||||||
|
// spawn a thread to give progress heartbeats
|
||||||
|
HeartbeatsThread prog = new HeartbeatsThread();
|
||||||
|
try {
|
||||||
|
prog.start();
|
||||||
|
|
||||||
|
// optimize index
|
||||||
|
if (indexConf.doOptimize()) {
|
||||||
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Optimizing index.");
|
||||||
|
}
|
||||||
|
writer.optimize();
|
||||||
|
}
|
||||||
|
|
||||||
|
// close index
|
||||||
|
writer.close();
|
||||||
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Done indexing " + docCount + " docs.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy to perm destination in dfs
|
||||||
|
fs.completeLocalOutput(perm, temp);
|
||||||
|
if (LOG.isInfoEnabled()) {
|
||||||
|
LOG.info("Copy done.");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
prog.setClosed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class HeartbeatsThread extends Thread {
|
||||||
|
|
||||||
|
/** Flag to track when to finish. */
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the thread. Sending heart beats to the framework.
|
||||||
|
*
|
||||||
|
* @see java.lang.Runnable#run()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!closed) {
|
||||||
|
try {
|
||||||
|
context.setStatus("closing");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
continue;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switches the flag.
|
||||||
|
*/
|
||||||
|
public void setClosed() {
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.document.Field;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
|
||||||
|
* to build a Lucene index
|
||||||
|
*/
|
||||||
|
public class IndexTableReducer
|
||||||
|
extends Reducer<ImmutableBytesWritable, Result,
|
||||||
|
ImmutableBytesWritable, LuceneDocumentWrapper>
|
||||||
|
implements Configurable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(IndexTableReducer.class);
|
||||||
|
|
||||||
|
private IndexConfiguration indexConf;
|
||||||
|
private Configuration conf = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes each given record, consisting of the key and the given values, to
|
||||||
|
* the index.
|
||||||
|
*
|
||||||
|
* @param key The current row key.
|
||||||
|
* @param values The values for the given row.
|
||||||
|
* @param context The context of the reduce.
|
||||||
|
* @throws IOException When writing the record fails.
|
||||||
|
* @throws InterruptedException When the job gets interrupted.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void reduce(ImmutableBytesWritable key, Iterable<Result> values,
|
||||||
|
Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Document doc = null;
|
||||||
|
for (Result r : values) {
|
||||||
|
if (doc == null) {
|
||||||
|
doc = new Document();
|
||||||
|
// index and store row key, row key already UTF-8 encoded
|
||||||
|
Field keyField = new Field(indexConf.getRowkeyName(),
|
||||||
|
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
|
||||||
|
Field.Store.YES, Field.Index.UN_TOKENIZED);
|
||||||
|
keyField.setOmitNorms(true);
|
||||||
|
doc.add(keyField);
|
||||||
|
}
|
||||||
|
// each column (name-value pair) is a field (name-value pair)
|
||||||
|
for (KeyValue kv: r.list()) {
|
||||||
|
// name is already UTF-8 encoded
|
||||||
|
String column = Bytes.toString(kv.getColumn());
|
||||||
|
byte[] columnValue = kv.getValue();
|
||||||
|
Field.Store store = indexConf.isStore(column)?
|
||||||
|
Field.Store.YES: Field.Store.NO;
|
||||||
|
Field.Index index = indexConf.isIndex(column)?
|
||||||
|
(indexConf.isTokenize(column)?
|
||||||
|
Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
|
||||||
|
Field.Index.NO;
|
||||||
|
|
||||||
|
// UTF-8 encode value
|
||||||
|
Field field = new Field(column, Bytes.toString(columnValue),
|
||||||
|
store, index);
|
||||||
|
field.setBoost(indexConf.getBoost(column));
|
||||||
|
field.setOmitNorms(indexConf.isOmitNorms(column));
|
||||||
|
|
||||||
|
doc.add(field);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
context.write(key, new LuceneDocumentWrapper(doc));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current configuration.
|
||||||
|
*
|
||||||
|
* @return The current configuration.
|
||||||
|
* @see org.apache.hadoop.conf.Configurable#getConf()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the configuration. This is used to set up the index configuration.
|
||||||
|
*
|
||||||
|
* @param configuration The configuration to set.
|
||||||
|
* @see org.apache.hadoop.conf.Configurable#setConf(
|
||||||
|
* org.apache.hadoop.conf.Configuration)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration configuration) {
|
||||||
|
this.conf = configuration;
|
||||||
|
indexConf = new IndexConfiguration();
|
||||||
|
String content = conf.get("hbase.index.conf");
|
||||||
|
if (content != null) {
|
||||||
|
indexConf.addFromXML(content);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Index conf: " + indexConf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends the base <code>Mapper</code> class to add the required input key
|
||||||
|
* and value classes.
|
||||||
|
*
|
||||||
|
* @param <KEYOUT> The type of the key.
|
||||||
|
* @param <VALUEOUT> The type of the value.
|
||||||
|
* @see org.apache.hadoop.mapreduce.Mapper
|
||||||
|
*/
|
||||||
|
public abstract class TableMapper<KEYOUT, VALUEOUT>
|
||||||
|
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends the basic <code>Reducer</code> class to add the required key and
|
||||||
|
* value output classes.
|
||||||
|
*
|
||||||
|
* @param <KEYIN> The type of the key.
|
||||||
|
* @param <VALUEIN> The type of the value.
|
||||||
|
* @see org.apache.hadoop.mapreduce.Reducer
|
||||||
|
*/
|
||||||
|
public abstract class TableReducer<KEYIN, VALUEIN>
|
||||||
|
extends Reducer<KEYIN, VALUEIN, ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,260 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import junit.framework.TestSuite;
|
||||||
|
import junit.textui.TestRunner;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MultiRegionTable;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.lucene.index.Term;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.MultiSearcher;
|
||||||
|
import org.apache.lucene.search.Searchable;
|
||||||
|
import org.apache.lucene.search.Searcher;
|
||||||
|
import org.apache.lucene.search.TermQuery;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Map/Reduce job to build index over HBase table
|
||||||
|
*/
|
||||||
|
public class TestTableIndex extends MultiRegionTable {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestTableIndex.class);
|
||||||
|
|
||||||
|
static final byte[] TABLE_NAME = Bytes.toBytes("moretest");
|
||||||
|
static final byte[] INPUT_COLUMN = Bytes.toBytes("contents:");
|
||||||
|
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||||
|
static final byte[] OUTPUT_COLUMN = Bytes.toBytes("text:");
|
||||||
|
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
||||||
|
static final String ROWKEY_NAME = "key";
|
||||||
|
static final String INDEX_DIR = "testindex";
|
||||||
|
private static final byte [][] columns = new byte [][] {
|
||||||
|
INPUT_COLUMN,
|
||||||
|
OUTPUT_COLUMN
|
||||||
|
};
|
||||||
|
|
||||||
|
/** default constructor */
|
||||||
|
public TestTableIndex() {
|
||||||
|
super(Bytes.toString(INPUT_COLUMN));
|
||||||
|
desc = new HTableDescriptor(TABLE_NAME);
|
||||||
|
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
||||||
|
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (conf != null) {
|
||||||
|
FileUtil.fullyDelete(new File(conf.get("hadoop.tmp.dir")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test HBase map/reduce
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public void testTableIndex()
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
boolean printResults = false;
|
||||||
|
if (printResults) {
|
||||||
|
LOG.info("Print table contents before map/reduce");
|
||||||
|
}
|
||||||
|
scanTable(printResults);
|
||||||
|
|
||||||
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||||
|
|
||||||
|
// set configuration parameter for index build
|
||||||
|
conf.set("hbase.index.conf", createIndexConfContent());
|
||||||
|
|
||||||
|
try {
|
||||||
|
Job job = new Job(conf, "index column contents");
|
||||||
|
// number of indexes to partition into
|
||||||
|
job.setNumReduceTasks(1);
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addFamily(INPUT_FAMILY);
|
||||||
|
// use identity map (a waste, but just as an example)
|
||||||
|
IdentityTableMapper.initJob(Bytes.toString(TABLE_NAME), scan,
|
||||||
|
IdentityTableMapper.class, job);
|
||||||
|
// use IndexTableReduce to build a Lucene index
|
||||||
|
job.setReducerClass(IndexTableReducer.class);
|
||||||
|
job.setOutputFormatClass(IndexOutputFormat.class);
|
||||||
|
FileOutputFormat.setOutputPath(job, new Path(INDEX_DIR));
|
||||||
|
job.waitForCompletion(true);
|
||||||
|
} finally {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (printResults) {
|
||||||
|
LOG.info("Print table contents after map/reduce");
|
||||||
|
}
|
||||||
|
scanTable(printResults);
|
||||||
|
|
||||||
|
// verify index results
|
||||||
|
verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createIndexConfContent() {
|
||||||
|
StringBuffer buffer = new StringBuffer();
|
||||||
|
buffer.append("<configuration><column><property>" +
|
||||||
|
"<name>hbase.column.name</name><value>" + INPUT_COLUMN +
|
||||||
|
"</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.column.store</name> " +
|
||||||
|
"<value>true</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.column.index</name>" +
|
||||||
|
"<value>true</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.column.tokenize</name>" +
|
||||||
|
"<value>false</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.column.boost</name>" +
|
||||||
|
"<value>3</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.column.omit.norms</name>" +
|
||||||
|
"<value>false</value></property></column>");
|
||||||
|
buffer.append("<property><name>hbase.index.rowkey.name</name><value>" +
|
||||||
|
ROWKEY_NAME + "</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.index.max.buffered.docs</name>" +
|
||||||
|
"<value>500</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.index.max.field.length</name>" +
|
||||||
|
"<value>10000</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.index.merge.factor</name>" +
|
||||||
|
"<value>10</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.index.use.compound.file</name>" +
|
||||||
|
"<value>true</value></property>");
|
||||||
|
buffer.append("<property><name>hbase.index.optimize</name>" +
|
||||||
|
"<value>true</value></property></configuration>");
|
||||||
|
|
||||||
|
IndexConfiguration c = new IndexConfiguration();
|
||||||
|
c.addFromXML(buffer.toString());
|
||||||
|
return c.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scanTable(boolean printResults)
|
||||||
|
throws IOException {
|
||||||
|
HTable table = new HTable(conf, TABLE_NAME);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumns(columns);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
try {
|
||||||
|
for (Result r : scanner) {
|
||||||
|
if (printResults) {
|
||||||
|
LOG.info("row: " + Bytes.toStringBinary(r.getRow()));
|
||||||
|
}
|
||||||
|
for (KeyValue kv : r.list()) {
|
||||||
|
if (printResults) {
|
||||||
|
LOG.info(" column: " + Bytes.toStringBinary(kv.getKey()) + " value: "
|
||||||
|
+ Bytes.toStringBinary(kv.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verify() throws IOException {
|
||||||
|
// Force a cache flush for every online region to ensure that when the
|
||||||
|
// scanner takes its snapshot, all the updates have made it into the cache.
|
||||||
|
for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer().
|
||||||
|
getOnlineRegions()) {
|
||||||
|
HRegionIncommon region = new HRegionIncommon(r);
|
||||||
|
region.flushcache();
|
||||||
|
}
|
||||||
|
|
||||||
|
Path localDir = new Path(getUnitTestdir(getName()), "index_" +
|
||||||
|
Integer.toString(new Random().nextInt()));
|
||||||
|
this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
|
||||||
|
FileSystem localfs = FileSystem.getLocal(conf);
|
||||||
|
FileStatus [] indexDirs = localfs.listStatus(localDir);
|
||||||
|
Searcher searcher = null;
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
try {
|
||||||
|
if (indexDirs.length == 1) {
|
||||||
|
searcher = new IndexSearcher((new File(indexDirs[0].getPath().
|
||||||
|
toUri())).getAbsolutePath());
|
||||||
|
} else if (indexDirs.length > 1) {
|
||||||
|
Searchable[] searchers = new Searchable[indexDirs.length];
|
||||||
|
for (int i = 0; i < indexDirs.length; i++) {
|
||||||
|
searchers[i] = new IndexSearcher((new File(indexDirs[i].getPath().
|
||||||
|
toUri()).getAbsolutePath()));
|
||||||
|
}
|
||||||
|
searcher = new MultiSearcher(searchers);
|
||||||
|
} else {
|
||||||
|
throw new IOException("no index directory found");
|
||||||
|
}
|
||||||
|
|
||||||
|
HTable table = new HTable(conf, TABLE_NAME);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumns(columns);
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
|
||||||
|
IndexConfiguration indexConf = new IndexConfiguration();
|
||||||
|
String content = conf.get("hbase.index.conf");
|
||||||
|
if (content != null) {
|
||||||
|
indexConf.addFromXML(content);
|
||||||
|
}
|
||||||
|
String rowkeyName = indexConf.getRowkeyName();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
String value = Bytes.toString(r.getRow());
|
||||||
|
Term term = new Term(rowkeyName, value);
|
||||||
|
int hitCount = searcher.search(new TermQuery(term)).length();
|
||||||
|
assertEquals("check row " + value, 1, hitCount);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
LOG.debug("Searcher.maxDoc: " + searcher.maxDoc());
|
||||||
|
LOG.debug("IndexReader.numDocs: " + ((IndexSearcher)searcher).getIndexReader().numDocs());
|
||||||
|
int maxDoc = ((IndexSearcher)searcher).getIndexReader().numDocs();
|
||||||
|
assertEquals("check number of rows", maxDoc, count);
|
||||||
|
} finally {
|
||||||
|
if (null != searcher)
|
||||||
|
searcher.close();
|
||||||
|
if (null != scanner)
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param args unused
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) {
|
||||||
|
TestRunner.run(new TestSuite(TestTableIndex.class));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,250 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MultiRegionTable;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
|
||||||
|
* on our tables is simple - take every row in the table, reverse the value of
|
||||||
|
* a particular cell, and write it back to the table.
|
||||||
|
*/
|
||||||
|
public class TestTableMapReduce extends MultiRegionTable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
|
||||||
|
|
||||||
|
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
||||||
|
static final byte[] INPUT_COLUMN = Bytes.toBytes("contents:");
|
||||||
|
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||||
|
static final byte[] OUTPUT_COLUMN = Bytes.toBytes("text:");
|
||||||
|
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
|
||||||
|
|
||||||
|
private static final byte [][] columns = new byte [][] {
|
||||||
|
INPUT_COLUMN, OUTPUT_COLUMN
|
||||||
|
};
|
||||||
|
|
||||||
|
/** constructor */
|
||||||
|
public TestTableMapReduce() {
|
||||||
|
super(Bytes.toString(INPUT_COLUMN));
|
||||||
|
desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
|
||||||
|
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
||||||
|
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass the given key and processed record reduce
|
||||||
|
*/
|
||||||
|
public static class ProcessContentsMapper
|
||||||
|
extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass the key, and reversed value to reduce
|
||||||
|
*
|
||||||
|
* @param key
|
||||||
|
* @param value
|
||||||
|
* @param context
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void map(ImmutableBytesWritable key, Result value,
|
||||||
|
Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (value.size() != 1) {
|
||||||
|
throw new IOException("There should only be one input column");
|
||||||
|
}
|
||||||
|
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
|
||||||
|
cf = value.getMap();
|
||||||
|
if(!cf.containsKey(INPUT_FAMILY)) {
|
||||||
|
throw new IOException("Wrong input columns. Missing: '" +
|
||||||
|
Bytes.toString(INPUT_FAMILY) + "'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the original value and reverse it
|
||||||
|
String originalValue = new String(value.getValue(INPUT_FAMILY, null),
|
||||||
|
HConstants.UTF8_ENCODING);
|
||||||
|
StringBuilder newValue = new StringBuilder(originalValue);
|
||||||
|
newValue.reverse();
|
||||||
|
// Now set the value to be collected
|
||||||
|
Put outval = new Put(key.get());
|
||||||
|
outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
|
||||||
|
context.write(key, outval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a map/reduce against a multi-region table
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public void testMultiRegionTable()
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTestOnTable(HTable table)
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||||
|
|
||||||
|
Job job = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Before map/reduce startup");
|
||||||
|
job = new Job(conf, "process column contents");
|
||||||
|
job.setNumReduceTasks(1);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addFamily(INPUT_FAMILY);
|
||||||
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
|
Bytes.toString(table.getTableName()), scan,
|
||||||
|
ProcessContentsMapper.class, ImmutableBytesWritable.class,
|
||||||
|
Put.class, job);
|
||||||
|
TableMapReduceUtil.initTableReducerJob(
|
||||||
|
Bytes.toString(table.getTableName()),
|
||||||
|
IdentityTableReducer.class, job);
|
||||||
|
FileOutputFormat.setOutputPath(job, new Path("test"));
|
||||||
|
LOG.info("Started " + Bytes.toString(table.getTableName()));
|
||||||
|
job.waitForCompletion(true);
|
||||||
|
LOG.info("After map/reduce completion");
|
||||||
|
|
||||||
|
// verify map-reduce results
|
||||||
|
verify(Bytes.toString(table.getTableName()));
|
||||||
|
} finally {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
if (job != null) {
|
||||||
|
FileUtil.fullyDelete(
|
||||||
|
new File(job.getConfiguration().get("hadoop.tmp.dir")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verify(String tableName) throws IOException {
|
||||||
|
HTable table = new HTable(conf, tableName);
|
||||||
|
boolean verified = false;
|
||||||
|
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
|
||||||
|
int numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||||
|
for (int i = 0; i < numRetries; i++) {
|
||||||
|
try {
|
||||||
|
LOG.info("Verification attempt #" + i);
|
||||||
|
verifyAttempt(table);
|
||||||
|
verified = true;
|
||||||
|
break;
|
||||||
|
} catch (NullPointerException e) {
|
||||||
|
// If here, a cell was empty. Presume its because updates came in
|
||||||
|
// after the scanner had been opened. Wait a while and retry.
|
||||||
|
LOG.debug("Verification attempt failed: " + e.getMessage());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(pause);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(verified);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks at every value of the mapreduce output and verifies that indeed
|
||||||
|
* the values have been reversed.
|
||||||
|
*
|
||||||
|
* @param table Table to scan.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NullPointerException if we failed to find a cell value
|
||||||
|
*/
|
||||||
|
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumns(columns);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
try {
|
||||||
|
for (Result r : scanner) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
if (r.size() > 2 ) {
|
||||||
|
throw new IOException("Too many results, expected 2 got " +
|
||||||
|
r.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
byte[] firstValue = null;
|
||||||
|
byte[] secondValue = null;
|
||||||
|
int count = 0;
|
||||||
|
for(KeyValue kv : r.list()) {
|
||||||
|
if (count == 0) {
|
||||||
|
firstValue = kv.getValue();
|
||||||
|
}
|
||||||
|
if (count == 1) {
|
||||||
|
secondValue = kv.getValue();
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
if (count == 2) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String first = "";
|
||||||
|
if (firstValue == null) {
|
||||||
|
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||||
|
": first value is null");
|
||||||
|
}
|
||||||
|
first = new String(firstValue, HConstants.UTF8_ENCODING);
|
||||||
|
|
||||||
|
String second = "";
|
||||||
|
if (secondValue == null) {
|
||||||
|
throw new NullPointerException(Bytes.toString(r.getRow()) +
|
||||||
|
": second value is null");
|
||||||
|
}
|
||||||
|
byte[] secondReversed = new byte[secondValue.length];
|
||||||
|
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
|
||||||
|
secondReversed[i] = secondValue[j];
|
||||||
|
}
|
||||||
|
second = new String(secondReversed, HConstants.UTF8_ENCODING);
|
||||||
|
|
||||||
|
if (first.compareTo(second) != 0) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("second key is not the reverse of first. row=" +
|
||||||
|
r.getRow() + ", first value=" + first + ", second value=" +
|
||||||
|
second);
|
||||||
|
}
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,198 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2007 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.MapWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
|
|
||||||
|
public class TestTimeRangeMapRed extends HBaseClusterTestCase {
|
||||||
|
|
||||||
|
private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
|
||||||
|
|
||||||
|
private static final byte [] KEY = Bytes.toBytes("row1");
|
||||||
|
private static final NavigableMap<Long, Boolean> TIMESTAMP =
|
||||||
|
new TreeMap<Long, Boolean>();
|
||||||
|
static {
|
||||||
|
TIMESTAMP.put((long)1245620000, false);
|
||||||
|
TIMESTAMP.put((long)1245620005, true); // include
|
||||||
|
TIMESTAMP.put((long)1245620010, true); // include
|
||||||
|
TIMESTAMP.put((long)1245620055, true); // include
|
||||||
|
TIMESTAMP.put((long)1245620100, true); // include
|
||||||
|
TIMESTAMP.put((long)1245620150, false);
|
||||||
|
TIMESTAMP.put((long)1245620250, false);
|
||||||
|
}
|
||||||
|
static final long MINSTAMP = 1245620005;
|
||||||
|
static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it.
|
||||||
|
|
||||||
|
static final byte[] TABLE_NAME = Bytes.toBytes("table123");
|
||||||
|
static final byte[] FAMILY_NAME = Bytes.toBytes("text");
|
||||||
|
static final byte[] COLUMN_NAME = Bytes.toBytes("input");
|
||||||
|
|
||||||
|
protected HTableDescriptor desc;
|
||||||
|
protected HTable table;
|
||||||
|
|
||||||
|
public TestTimeRangeMapRed() {
|
||||||
|
super();
|
||||||
|
System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
|
||||||
|
conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));
|
||||||
|
this.setOpenMetaTable(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
desc = new HTableDescriptor(TABLE_NAME);
|
||||||
|
HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
|
||||||
|
col.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
desc.addFamily(col);
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
|
admin.createTable(desc);
|
||||||
|
table = new HTable(conf, desc.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ProcessTimeRangeMapper
|
||||||
|
extends TableMapper<ImmutableBytesWritable, MapWritable>
|
||||||
|
implements Configurable {
|
||||||
|
|
||||||
|
private Configuration conf = null;
|
||||||
|
private HTable table = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(ImmutableBytesWritable key, Result result,
|
||||||
|
Context context)
|
||||||
|
throws IOException {
|
||||||
|
List<Long> tsList = new ArrayList<Long>();
|
||||||
|
for (KeyValue kv : result.sorted()) {
|
||||||
|
tsList.add(kv.getTimestamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Long ts : tsList) {
|
||||||
|
Put put = new Put(key.get());
|
||||||
|
put.add(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
table.flushCommits();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration configuration) {
|
||||||
|
this.conf = configuration;
|
||||||
|
try {
|
||||||
|
table = new HTable(new HBaseConfiguration(conf), TABLE_NAME);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTimeRangeMapRed()
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
prepareTest();
|
||||||
|
runTestOnTable();
|
||||||
|
verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareTest() throws IOException {
|
||||||
|
for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
|
||||||
|
Put put = new Put(KEY);
|
||||||
|
put.add(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
table.flushCommits();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTestOnTable()
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||||
|
Job job = null;
|
||||||
|
try {
|
||||||
|
job = new Job(conf, "test123");
|
||||||
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumn(FAMILY_NAME, COLUMN_NAME);
|
||||||
|
scan.setTimeRange(MINSTAMP, MAXSTAMP);
|
||||||
|
scan.setMaxVersions();
|
||||||
|
TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME),
|
||||||
|
scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
|
||||||
|
job.waitForCompletion(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
if (job != null) {
|
||||||
|
FileUtil.fullyDelete(
|
||||||
|
new File(job.getConfiguration().get("hadoop.tmp.dir")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verify() throws IOException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumn(FAMILY_NAME, COLUMN_NAME);
|
||||||
|
scan.setMaxVersions();
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
for (Result r: scanner) {
|
||||||
|
for (KeyValue kv : r.sorted()) {
|
||||||
|
assertEquals(TIMESTAMP.get(kv.getTimestamp()), (Boolean)Bytes.toBoolean(kv.getValue()));
|
||||||
|
log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(kv.getColumn())
|
||||||
|
+ "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(kv.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue