HADOOP-1519 map/reduce interface for HBase

AbstractMergeTestBase, HBaseTestCase: move createNewHRegion to HBaseTestCase
MiniHBaseCluster: add deleteOnExit, getDFSCluster, fix Javadoc
TestScanner2: moved KeyedData to org.apache.hadoop.hbase.io
TestTableMapReduce: new test case to test map/reduce interface to HBase
hbase-site.xml: change hbase.client.pause from 3 to 5 seconds, hbase.client.retries.number to 5 so that tests will not time out or run out of retries
HClient: moved KeyedData to org.apache.hadoop.hbase.io, fix javadoc, add method getStartKeys
HMaster: moved KeyedData to org.apache.hadoop.hbase.io, remove unused variables, remove extraneous throws clause, 
HRegionInterface, HRegionServer: moved KeyedData to org.apache.hadoop.hbase.io
KeyedData: moved KeyedData to org.apache.hadoop.hbase.io
KeyedDataArrayWritable: new class to support HBase map/reduce
org.apache.hadoop.hbase.mapred: new classes for map/reduce
- GroupingTableMap
- IdentityTableMap
- IdentityTableReduce
- TableInputFormat
- TableMap
- TableOutputCollector
- TableOutputFormat
- TableReduce
- TableSplit
hbase/bin/hbase: changes for map/reduce


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@552127 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-06-30 11:11:32 +00:00
parent 96c64712e6
commit 60a2e629d1
23 changed files with 1420 additions and 53 deletions

View File

@ -43,3 +43,4 @@ Trunk (unreleased changes)
25. HADOOP-1537. Catch exceptions in testCleanRegionServerExit so we can see
what is failing.
26. HADOOP-1543 [hbase] Add HClient.tableExists
27. HADOOP-1519 [hbase] map/reduce interface for HBase

View File

@ -82,10 +82,10 @@ fi
# CLASSPATH initially contains $HBASE_CONF_DIR
# Add HADOOP_CONF_DIR if its been defined.
CLASSPATH="${HBASE_CONF_DIR}"
if [ ! "$HADOOP_CONF_DIR" = "" ]; then
CLASSPATH="${CLASSPATH}:${HADOOP_CONF_DIR}"
fi
CLASSPATH="${CLASSPATH}:${HBASE_CONF_DIR}"
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
# for developers, add hbase and hadoop classes to CLASSPATH
@ -112,13 +112,13 @@ IFS=
for f in "$HBASE_HOME/hadoop-hbase-*.jar"; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in "$HADOOP_HOME/build/contrib/hbase/hadoop-hbase-*.jar"; do
CLASSPATH=${CLASSPATH}:$f;
done
if [ -f "$HADOOP_HOME/contrib/hadoop-hbase.jar" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/contrib/hadoop-hbase.jar
fi
if [ -d "$HADOOP_HOME/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME
fi
for f in $HADOOP_HOME/hadoop-*-core.jar; do
for f in $HADOOP_HOME/hadoop-*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
@ -78,10 +79,16 @@ public class HClient implements HConstants {
this.regionInfo;
}
/**
* @return HRegionInfo
*/
public HRegionInfo getRegionInfo(){
return regionInfo;
}
/**
* @return HServerAddress
*/
public HServerAddress getServerAddress(){
return serverAddress;
}
@ -588,6 +595,23 @@ public class HClient implements HConstants {
this.tableServers = getTableServers(tableName);
}
/**
* Gets the starting row key for every region in the currently open table
* @return Array of region starting row keys
*/
public synchronized Text[] getStartKeys() {
if(this.tableServers == null) {
throw new IllegalStateException("Must open table first");
}
Text[] keys = new Text[tableServers.size()];
int i = 0;
for(Text key: tableServers.keySet()){
keys[i++] = key;
}
return keys;
}
/**
* Gets the servers of the given table.
*
@ -1360,6 +1384,7 @@ public class HClient implements HConstants {
private Text startRow;
private boolean closed;
private RegionLocation[] regions;
@SuppressWarnings("hiding")
private int currentRegion;
private HRegionInterface server;
private long scannerId;

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@ -1854,7 +1855,6 @@ public class HMaster implements HConstants, HMasterInterface,
try {
DataInputBuffer inbuf = new DataInputBuffer();
byte[] bytes;
while(true) {
HRegionInfo info = new HRegionInfo();
String serverName = null;
@ -1978,8 +1978,7 @@ public class HMaster implements HConstants, HMasterInterface,
@Override
protected void processScanItem(String serverName, long startCode,
HRegionInfo info)
throws IOException {
HRegionInfo info) {
if (isBeingServed(serverName, startCode)) {
TreeSet<HRegionInfo> regions = servedRegions.get(serverName);
if (regions == null) {
@ -2260,6 +2259,7 @@ public class HMaster implements HConstants, HMasterInterface,
/** Instantiated to monitor the health of a region server */
private class ServerExpirer implements LeaseListener {
@SuppressWarnings("hiding")
private String server;
ServerExpirer(String server) {

View File

@ -17,6 +17,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;

View File

@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.*;
import java.io.*;
/*******************************************************************************
* LabelledData is just a data pair.
* KeyedData is just a data pair.
* It includes an HStoreKey and some associated data.
******************************************************************************/
public class KeyedData implements Writable {
@ -60,7 +61,7 @@ public class KeyedData implements Writable {
*/
public void write(DataOutput out) throws IOException {
key.write(out);
out.writeShort(this.data.length);
out.writeInt(this.data.length);
out.write(this.data);
}
@ -69,7 +70,7 @@ public class KeyedData implements Writable {
*/
public void readFields(DataInput in) throws IOException {
key.readFields(in);
this.data = new byte[in.readShort()];
this.data = new byte[in.readInt()];
in.readFully(this.data);
}
}
}

View File

@ -0,0 +1,80 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* Wraps an array of KeyedData items as a Writable. The array elements
* may be null.
*/
public class KeyedDataArrayWritable implements Writable {
private final static KeyedData NULL_KEYEDDATA = new KeyedData();
private KeyedData[] m_data;
/**
* Make a record of length 0
*/
public KeyedDataArrayWritable() {
m_data = new KeyedData[0];
}
/** @return the array of KeyedData */
public KeyedData[] get() {
return m_data;
}
/**
* Sets the KeyedData array
*
* @param data array of KeyedData
*/
public void set(KeyedData[] data) {
if(data == null) {
throw new NullPointerException("KeyedData[] cannot be null");
}
m_data = data;
}
// Writable
public void readFields(DataInput in) throws IOException {
int len = in.readInt();
m_data = new KeyedData[len];
for(int i = 0; i < len; i++) {
m_data[i] = new KeyedData();
m_data[i].readFields(in);
}
}
public void write(DataOutput out) throws IOException {
int len = m_data.length;
out.writeInt(len);
for(int i = 0; i < len; i++) {
if(m_data[i] != null) {
m_data[i].write(out);
} else {
NULL_KEYEDDATA.write(out);
}
}
}
}

View File

@ -0,0 +1,152 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
/**
* Extract grouping columns from input record
*/
public class GroupingTableMap extends TableMap {
/**
* JobConf parameter to specify the columns used to produce the key passed to
* collect from the map phase
*/
public static final String GROUP_COLUMNS =
"hbase.mapred.groupingtablemap.columns";
private Text[] m_columns;
/** default constructor */
public GroupingTableMap() {
super();
}
/**
* Use this before submitting a TableMap job. It will appropriately set up the
* JobConf.
*
* @param table table to be processed
* @param columns space separated list of columns to fetch
* @param groupColumns space separated list of columns used to form the key used in collect
* @param mapper map class
* @param job job configuration object
*/
public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) {
initJob(table, columns, mapper, job);
job.set(GROUP_COLUMNS, groupColumns);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.mapred.TableMap#configure(org.apache.hadoop.mapred.JobConf)
*/
@Override
public void configure(JobConf job) {
super.configure(job);
String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
m_columns = new Text[cols.length];
for(int i = 0; i < cols.length; i++) {
m_columns[i] = new Text(cols[i]);
}
}
/**
* Extract the grouping columns from value to construct a new key.
*
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(@SuppressWarnings("unused") HStoreKey key,
KeyedDataArrayWritable value, TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
byte[][] keyVals = extractKeyValues(value);
if(keyVals != null) {
Text tKey = createGroupKey(keyVals);
output.collect(tKey, value);
}
}
/**
* Extract columns values from the current record. This method returns
* null if any of the columns are not found.
*
* Override this method if you want to deal with nulls differently.
*
* @param r
* @return array of byte values
*/
protected byte[][] extractKeyValues(KeyedDataArrayWritable r) {
byte[][] keyVals = null;
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
int numCols = m_columns.length;
if(numCols > 0) {
KeyedData[] recVals = r.get();
boolean found = true;
for(int i = 0; i < numCols && found; i++) {
found = false;
for(int j = 0; j < recVals.length; j++) {
if(recVals[j].getKey().getColumn().equals(m_columns[i])) {
found = true;
byte[] val = recVals[j].getData();
foundList.add(val);
break;
}
}
}
if(foundList.size() == numCols) {
keyVals = foundList.toArray(new byte[numCols][]);
}
}
return keyVals;
}
/**
* Create a key by concatenating multiple column values.
* Override this function in order to produce different types of keys.
*
* @param vals
* @return key generated by concatenating multiple column values
*/
protected Text createGroupKey(byte[][] vals) {
if(vals == null) {
return null;
}
StringBuilder sb = new StringBuilder();
for(int i = 0; i < vals.length; i++) {
if(i > 0) {
sb.append(" ");
}
sb.append(new String(vals[i]));
}
return new Text(sb.toString());
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Reporter;
/**
* Pass the given key and record as-is to reduce
*/
public class IdentityTableMap extends TableMap {
/** constructor */
public IdentityTableMap() {
super();
}
/**
* Pass the key, value to reduce
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(HStoreKey key, KeyedDataArrayWritable value,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Text tKey = key.getRow();
output.collect(tKey, value);
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Reporter;
/**
* Write to table each key, record pair
*/
public class IdentityTableReduce extends TableReduce {
/** constructor */
public IdentityTableReduce() {
super();
}
/**
* No aggregation, output pairs of (key, record)
*
* @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
while(values.hasNext()) {
KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next();
output.collect(key, r);
}
}
}

View File

@ -0,0 +1,239 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.ArrayList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.HClient;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.log4j.Logger;
/**
* Convert HBase tabular data into a format that is consumable by Map/Reduce
*/
public class TableInputFormat implements InputFormat, JobConfigurable {
static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
/**
* space delimited list of columns
* @see org.apache.hadoop.hbase.HAbstractScanner for column name wildcards
*/
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
private Text m_tableName;
Text[] m_cols;
HClient m_client;
/**
* Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
*/
class TableRecordReader implements RecordReader {
private HScannerInterface m_scanner;
private TreeMap<Text, byte[]> m_row; // current buffer
private Text m_endRow;
/**
* Constructor
* @param startRow (inclusive)
* @param endRow (exclusive)
* @throws IOException
*/
public TableRecordReader(Text startRow, Text endRow) throws IOException {
LOG.debug("start construct");
m_row = new TreeMap<Text, byte[]>();
m_scanner = m_client.obtainScanner(m_cols, startRow);
m_endRow = endRow;
LOG.debug("end construct");
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#close()
*/
public void close() throws IOException {
LOG.debug("start close");
m_scanner.close();
LOG.debug("end close");
}
/**
* @return HStoreKey
*
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
public WritableComparable createKey() {
return new HStoreKey();
}
/**
* @return KeyedDataArrayWritable of KeyedData
*
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
public Writable createValue() {
return new KeyedDataArrayWritable();
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#getPos()
*/
public long getPos() {
// This should be the ordinal tuple in the range;
// not clear how to calculate...
return 0;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#getProgress()
*/
public float getProgress() {
// Depends on the total number of tuples and getPos
return 0;
}
/**
* @param key HStoreKey as input key.
* @param value KeyedDataArrayWritable as input value
*
* Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to
* (HStoreKey, KeyedDataArrayWritable)
* @return true if there was more data
* @throws IOException
*/
public boolean next(Writable key, Writable value) throws IOException {
LOG.debug("start next");
m_row.clear();
HStoreKey tKey = (HStoreKey)key;
boolean hasMore = m_scanner.next(tKey, m_row);
if(hasMore) {
if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
hasMore = false;
} else {
KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value;
ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
HStoreKey keyCol = new HStoreKey(tKey);
keyCol.setColumn(e.getKey());
columns.add(new KeyedData(keyCol, e.getValue()));
}
// set the output
rowVal.set(columns.toArray(new KeyedData[columns.size()]));
}
}
LOG.debug("end next");
return hasMore;
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(org.apache.hadoop.mapred.InputSplit, org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.Reporter)
*/
public RecordReader getRecordReader(InputSplit split,
@SuppressWarnings("unused") JobConf job,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
TableSplit tSplit = (TableSplit)split;
return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
}
/**
* A split will be created for each HRegion of the input table
*
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
*/
@SuppressWarnings("unused")
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
LOG.debug("start getSplits");
Text[] startKeys = m_client.getStartKeys();
if(startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
}
InputSplit[] splits = new InputSplit[startKeys.length];
for(int i = 0; i < startKeys.length; i++) {
splits[i] = new TableSplit(m_tableName, startKeys[i],
((i + 1) < startKeys.length) ? startKeys[i + 1] : new Text());
LOG.debug("split: " + i + "->" + splits[i]);
}
LOG.debug("end splits");
return splits;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
*/
public void configure(JobConf job) {
LOG.debug("start configure");
Path[] tableNames = job.getInputPaths();
m_tableName = new Text(tableNames[0].getName());
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
m_cols = new Text[colNames.length];
for(int i = 0; i < m_cols.length; i++) {
m_cols[i] = new Text(colNames[i]);
}
m_client = new HClient(job);
try {
m_client.openTable(m_tableName);
} catch(Exception e) {
LOG.error(e);
}
LOG.debug("end configure");
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.InputFormat#validateInput(org.apache.hadoop.mapred.JobConf)
*/
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path[] tableNames = job.getInputPaths();
if(tableNames == null || tableNames.length > 1) {
throw new IOException("expecting one table name");
}
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if(colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
}
}

View File

@ -0,0 +1,112 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
/**
* Scan an HBase table to sort by a specified sort column.
* If the column does not exist, the record is not passed to Reduce.
*
*/
public abstract class TableMap extends MapReduceBase implements Mapper {
private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
private TableOutputCollector m_collector;
/** constructor*/
public TableMap() {
m_collector = new TableOutputCollector();
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table table name
* @param columns columns to scan
* @param mapper mapper class
* @param job job configuration
*/
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) {
job.setInputFormat(TableInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KeyedDataArrayWritable.class);
job.setMapperClass(mapper);
job.setInputPath(new Path(table));
job.set(TableInputFormat.COLUMN_LIST, columns);
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
*/
@Override
public void configure(JobConf job) {
super.configure(job);
}
/**
* Input:
* @param key is of type HStoreKey
* @param value is of type KeyedDataArrayWritable
* @param output output collector
* @param reporter object to use for status updates
* @throws IOException
*
* Output:
* The key is a specific column, including the input key or any value
* The value is of type LabeledData
*/
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {
LOG.debug("start map");
if(m_collector.collector == null) {
m_collector.collector = output;
}
map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter);
LOG.debug("end map");
}
/**
* Call a user defined function on a single HBase record, represented
* by a key and its associated record value.
*
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public abstract void map(HStoreKey key, KeyedDataArrayWritable value,
TableOutputCollector output, Reporter reporter) throws IOException;
}

View File

@ -0,0 +1,43 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
/**
* Refine the types that can be collected from a Table Map/Reduce jobs.
*/
public class TableOutputCollector {
/** The collector object */
public OutputCollector collector;
/**
* Restrict Table Map/Reduce's output to be a Text key and a record.
*
* @param key
* @param value
* @throws IOException
*/
public void collect(Text key, KeyedDataArrayWritable value)
throws IOException {
collector.collect(key, value);
}
}

View File

@ -0,0 +1,137 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormatBase;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hbase.HClient;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.log4j.Logger;
/**
* Convert Map/Reduce output and write it to an HBase table
*/
public class TableOutputFormat extends OutputFormatBase {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
static final Logger LOG = Logger.getLogger(TableOutputFormat.class.getName());
/** constructor */
public TableOutputFormat() {}
/**
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
* and write to an HBase table
*/
protected class TableRecordWriter implements RecordWriter {
private HClient m_client;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
*
* @param client
*/
public TableRecordWriter(HClient client) {
m_client = client;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordWriter#close(org.apache.hadoop.mapred.Reporter)
*/
public void close(@SuppressWarnings("unused") Reporter reporter) {}
/**
* Expect key to be of type Text
* Expect value to be of type KeyedDataArrayWritable
*
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public void write(WritableComparable key, Writable value) throws IOException {
LOG.debug("start write");
Text tKey = (Text)key;
KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value;
KeyedData[] columns = tValue.get();
// start transaction
long xid = m_client.startUpdate(tKey);
for(int i = 0; i < columns.length; i++) {
KeyedData column = columns[i];
m_client.put(xid, column.getKey().getColumn(), column.getData());
}
// end transaction
m_client.commit(xid);
LOG.debug("end write");
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable)
*/
@Override
@SuppressWarnings("unused")
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
// expecting exactly one path
LOG.debug("start get writer");
Text tableName = new Text(job.get(OUTPUT_TABLE));
HClient client = null;
try {
client = new HClient(job);
client.openTable(tableName);
} catch(Exception e) {
LOG.error(e);
}
LOG.debug("end get writer");
return new TableRecordWriter(client);
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.OutputFormatBase#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf)
*/
@Override
@SuppressWarnings("unused")
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException, InvalidJobConfException, IOException {
String tableName = job.get(OUTPUT_TABLE);
if(tableName == null) {
throw new IOException("Must specify table name");
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
/**
* Write a table, sorting by the input key
*/
public abstract class TableReduce extends MapReduceBase implements Reducer {
private static final Logger LOG =
Logger.getLogger(TableReduce.class.getName());
TableOutputCollector m_collector;
/** Constructor */
public TableReduce() {
m_collector = new TableOutputCollector();
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table
* @param reducer
* @param job
*/
public static void initJob(String table, Class<? extends TableReduce> reducer,
JobConf job) {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
}
/**
* Create a unique key for table insertion by appending a local
* counter the given key.
*
* @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@SuppressWarnings("unchecked")
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
LOG.debug("start reduce");
if(m_collector.collector == null) {
m_collector.collector = output;
}
reduce((Text)key, values, m_collector, reporter);
LOG.debug("end reduce");
}
/**
*
* @param key
* @param values
* @param output
* @param reporter
* @throws IOException
*/
@SuppressWarnings("unchecked")
public abstract void reduce(Text key, Iterator values,
TableOutputCollector output, Reporter reporter) throws IOException;
}

View File

@ -0,0 +1,106 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
/**
* A table split corresponds to a key range [low, high)
*/
public class TableSplit implements InputSplit {
private Text m_tableName;
private Text m_startRow;
private Text m_endRow;
/** default constructor */
public TableSplit() {
m_tableName = new Text();
m_startRow = new Text();
m_endRow = new Text();
}
/**
* Constructor
* @param tableName
* @param startRow
* @param endRow
*/
public TableSplit(Text tableName, Text startRow, Text endRow) {
this();
m_tableName.set(tableName);
m_startRow.set(startRow);
m_endRow.set(endRow);
}
/** @return table name */
public Text getTableName() {
return m_tableName;
}
/** @return starting row key */
public Text getStartRow() {
return m_startRow;
}
/** @return end row key */
public Text getEndRow() {
return m_endRow;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.InputSplit#getLength()
*/
public long getLength() {
// Not clear how to obtain this... seems to be used only for sorting splits
return 0;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.InputSplit#getLocations()
*/
public String[] getLocations() {
// Return a random node from the cluster for now
return new String[] { };
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
m_tableName.readFields(in);
m_startRow.readFields(in);
m_endRow.readFields(in);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
m_tableName.write(out);
m_startRow.write(out);
m_endRow.write(out);
}
@Override
public String toString() {
return m_tableName +"," + m_startRow + "," + m_endRow;
}
}

View File

@ -11,13 +11,13 @@
</property>
<property>
<name>hbase.client.pause</name>
<value>3000</value>
<value>5000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>2</value>
<value>5</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.

View File

@ -19,7 +19,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -141,17 +140,4 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
return region;
}
private HRegion createNewHRegion(FileSystem fs, Path dir,
Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
Text endKey) throws IOException {
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
return new HRegion(dir,
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
fs, conf, info, null);
}
}

View File

@ -15,10 +15,14 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
/**
* Abstract base class for test cases. Performs all static initialization
@ -43,4 +47,18 @@ public abstract class HBaseTestCase extends TestCase {
protected Path getUnitTestdir(String testName) {
return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
}
protected HRegion createNewHRegion(FileSystem fs, Path dir,
Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
Text endKey) throws IOException {
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
return new HRegion(dir,
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
fs, conf, info, null);
}
}

View File

@ -41,7 +41,8 @@ public class MiniHBaseCluster implements HConstants {
private Thread masterThread;
List<HRegionServer> regionServers;
List<Thread> regionThreads;
private boolean deleteOnExit = true;
/**
* Starts a MiniHBaseCluster on top of a new MiniDFSCluster
*
@ -51,9 +52,23 @@ public class MiniHBaseCluster implements HConstants {
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes)
throws IOException {
this(conf, nRegionNodes, true);
this(conf, nRegionNodes, true, true, true);
}
/**
* Start a MiniHBaseCluster. Use the native file system unless
* miniHdfsFilesystem is set to true.
*
* @param conf
* @param nRegionNodes
* @param miniHdfsFilesystem
* @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
final boolean miniHdfsFilesystem) throws IOException {
this(conf, nRegionNodes, miniHdfsFilesystem, true, true);
}
/**
* Starts a MiniHBaseCluster on top of an existing HDFSCluster
*
@ -70,7 +85,7 @@ public class MiniHBaseCluster implements HConstants {
this.cluster = dfsCluster;
init(nRegionNodes);
}
/**
* Constructor.
* @param conf
@ -78,16 +93,20 @@ public class MiniHBaseCluster implements HConstants {
* @param miniHdfsFilesystem If true, set the hbase mini
* cluster atop a mini hdfs cluster. Otherwise, use the
* filesystem configured in <code>conf</code>.
* @param format the mini hdfs cluster
* @param deleteOnExit clean up mini hdfs files
* @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
final boolean miniHdfsFilesystem)
final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit)
throws IOException {
this.conf = conf;
this.deleteOnExit = deleteOnExit;
if (miniHdfsFilesystem) {
try {
this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
} catch(Throwable t) {
LOG.error("Failed setup of mini dfs cluster", t);
t.printStackTrace();
@ -112,7 +131,7 @@ public class MiniHBaseCluster implements HConstants {
if(this.conf.get(MASTER_ADDRESS) == null) {
this.conf.set(MASTER_ADDRESS, "localhost:0");
}
// Create the master
this.master = new HMaster(conf);
this.masterThread = new Thread(this.master, "HMaster");
@ -120,7 +139,7 @@ public class MiniHBaseCluster implements HConstants {
// Start up the master
LOG.info("Starting HMaster");
masterThread.start();
// Set the master's port for the HRegionServers
String address = master.getMasterAddress().toString();
this.conf.set(MASTER_ADDRESS, address);
@ -137,15 +156,24 @@ public class MiniHBaseCluster implements HConstants {
}
}
/**
* Get the cluster on which this HBase cluster is running
*
* @return MiniDFSCluster
*/
public MiniDFSCluster getDFSCluster() {
return cluster;
}
private void startRegionServers(final int nRegionNodes)
throws IOException {
throws IOException {
this.regionServers = new ArrayList<HRegionServer>(nRegionNodes);
this.regionThreads = new ArrayList<Thread>(nRegionNodes);
for(int i = 0; i < nRegionNodes; i++) {
startRegionServer();
}
}
void startRegionServer() throws IOException {
HRegionServer hsr = new HRegionServer(this.conf);
this.regionServers.add(hsr);
@ -153,7 +181,7 @@ public class MiniHBaseCluster implements HConstants {
t.start();
this.regionThreads.add(t);
}
/**
* @return Returns the rpc address actually used by the master server, because
* the supplied port is not necessarily the actual port used.
@ -161,7 +189,7 @@ public class MiniHBaseCluster implements HConstants {
public HServerAddress getHMasterAddress() {
return master.getMasterAddress();
}
/**
* Shut down the specified region server cleanly
*
@ -170,15 +198,20 @@ public class MiniHBaseCluster implements HConstants {
public void stopRegionServer(int serverNumber) {
if (serverNumber >= regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
"serverNumber > number of region servers");
"serverNumber > number of region servers");
}
this.regionServers.get(serverNumber).stop();
}
/**
* Wait for the specified region server to stop
*
* @param serverNumber
*/
public void waitOnRegionServer(int serverNumber) {
if (serverNumber >= regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
"serverNumber > number of region servers");
"serverNumber > number of region servers");
}
try {
this.regionThreads.get(serverNumber).join();
@ -186,7 +219,7 @@ public class MiniHBaseCluster implements HConstants {
e.printStackTrace();
}
}
/**
* Cause a region server to exit without cleaning up
*
@ -195,11 +228,11 @@ public class MiniHBaseCluster implements HConstants {
public void abortRegionServer(int serverNumber) {
if(serverNumber >= this.regionServers.size()) {
throw new ArrayIndexOutOfBoundsException(
"serverNumber > number of region servers");
"serverNumber > number of region servers");
}
this.regionServers.get(serverNumber).abort();
}
/** Shut down the HBase cluster */
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");
@ -218,6 +251,7 @@ public class MiniHBaseCluster implements HConstants {
}
try {
masterThread.join();
} catch(InterruptedException e) {
// continue
}
@ -227,12 +261,14 @@ public class MiniHBaseCluster implements HConstants {
LOG.info("Shutting down Mini DFS cluster");
cluster.shutdown();
}
// Delete all DFS files
deleteFile(new File(System.getProperty(
StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
if(deleteOnExit) {
deleteFile(new File(System.getProperty(
StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
}
}
private void deleteFile(File f) {
if(f.isDirectory()) {
File[] children = f.listFiles();
@ -242,4 +278,4 @@ public class MiniHBaseCluster implements HConstants {
}
f.delete();
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.io.Text;
/**

View File

@ -0,0 +1,239 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableOutputCollector;
import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
/**
* Test Map/Reduce job over HBase tables
*/
public class TestTableMapReduce extends HBaseTestCase {
static final String TABLE_NAME = "test";
static final String INPUT_COLUMN = "contents:";
static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
static final String OUTPUT_COLUMN = "text:";
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
private Random rand;
private HTableDescriptor desc;
private MiniDFSCluster dfsCluster = null;
private FileSystem fs;
private Path dir;
private MiniHBaseCluster hCluster = null;
private byte[][] values = {
"0123".getBytes(),
"abcd".getBytes(),
"wxyz".getBytes(),
"6789".getBytes()
};
@Override
public void setUp() throws Exception {
super.setUp();
rand = new Random();
desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
fs = dfsCluster.getFileSystem();
dir = new Path("/hbase");
fs.mkdirs(dir);
// create the root and meta regions and insert the data region into the meta
HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null);
HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null);
HRegion.addRegionToMETA(root, meta);
HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), null, null);
HRegion.addRegionToMETA(meta, region);
// insert some data into the test table
for(int i = 0; i < values.length; i++) {
long lockid = region.startUpdate(new Text("row_"
+ String.format("%1$05d", i)));
region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
region.commit(lockid);
}
region.close();
region.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
root.close();
root.getLog().closeAndDelete();
// Start up HBase cluster
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if(hCluster != null) {
hCluster.shutdown();
}
}
/**
* Pass the given key and processed record reduce
*/
public static class ProcessContentsMapper extends TableMap {
/** constructor */
public ProcessContentsMapper() {
super();
}
/**
* Pass the key, and reversed value to reduce
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(HStoreKey key, KeyedDataArrayWritable value,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Text tKey = key.getRow();
KeyedData[] columns = value.get();
if(columns.length != 1) {
throw new IOException("There should only be one input column");
}
if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) {
throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+ " but got: " + columns[0].getKey().getColumn());
}
// Get the input column key and change it to the output column key
HStoreKey column = columns[0].getKey();
column.setColumn(TEXT_OUTPUT_COLUMN);
// Get the original value and reverse it
String originalValue = new String(columns[0].getData());
StringBuilder newValue = new StringBuilder();
for(int i = originalValue.length() - 1; i >= 0; i--) {
newValue.append(originalValue.charAt(i));
}
// Now set the value to be collected
columns[0] = new KeyedData(column, newValue.toString().getBytes());
value.set(columns);
output.collect(tKey, value);
}
}
/**
* Test HBase map/reduce
* @throws IOException
*/
@SuppressWarnings("static-access")
public void testTableMapReduce() throws IOException {
System.out.println("Print table contents before map/reduce");
scanTable(conf);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1);
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);
ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN,
ProcessContentsMapper.class, jobConf);
IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf);
JobClient.runJob(jobConf);
} finally {
mrCluster.shutdown();
}
System.out.println("Print table contents after map/reduce");
scanTable(conf);
}
private void scanTable(Configuration conf) throws IOException {
HClient client = new HClient(conf);
client.openTable(new Text(TABLE_NAME));
Text[] columns = {
TEXT_INPUT_COLUMN,
TEXT_OUTPUT_COLUMN
};
HScannerInterface scanner =
client.obtainScanner(columns, HClient.EMPTY_START_ROW);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
while(scanner.next(key, results)) {
System.out.print("row: " + key.getRow());
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
System.out.print(" column: " + e.getKey() + " value: "
+ new String(e.getValue()));
}
System.out.println();
}
} finally {
scanner.close();
}
}
}