diff --git a/CHANGES.txt b/CHANGES.txt index 99ce05964e5..f98f19b7b4c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -43,3 +43,4 @@ Trunk (unreleased changes) 25. HADOOP-1537. Catch exceptions in testCleanRegionServerExit so we can see what is failing. 26. HADOOP-1543 [hbase] Add HClient.tableExists + 27. HADOOP-1519 [hbase] map/reduce interface for HBase diff --git a/bin/hbase b/bin/hbase index e664abd6533..e92697b6cdb 100755 --- a/bin/hbase +++ b/bin/hbase @@ -82,10 +82,10 @@ fi # CLASSPATH initially contains $HBASE_CONF_DIR # Add HADOOP_CONF_DIR if its been defined. -CLASSPATH="${HBASE_CONF_DIR}" if [ ! "$HADOOP_CONF_DIR" = "" ]; then CLASSPATH="${CLASSPATH}:${HADOOP_CONF_DIR}" fi +CLASSPATH="${CLASSPATH}:${HBASE_CONF_DIR}" CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar # for developers, add hbase and hadoop classes to CLASSPATH @@ -112,13 +112,13 @@ IFS= for f in "$HBASE_HOME/hadoop-hbase-*.jar"; do CLASSPATH=${CLASSPATH}:$f; done -for f in "$HADOOP_HOME/build/contrib/hbase/hadoop-hbase-*.jar"; do - CLASSPATH=${CLASSPATH}:$f; -done +if [ -f "$HADOOP_HOME/contrib/hadoop-hbase.jar" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_HOME/contrib/hadoop-hbase.jar +fi if [ -d "$HADOOP_HOME/webapps" ]; then CLASSPATH=${CLASSPATH}:$HADOOP_HOME fi -for f in $HADOOP_HOME/hadoop-*-core.jar; do +for f in $HADOOP_HOME/hadoop-*.jar; do CLASSPATH=${CLASSPATH}:$f; done diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index c683bab9b04..8b142c2da94 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; @@ -78,10 +79,16 @@ public class HClient implements HConstants { this.regionInfo; } + /** + * @return HRegionInfo + */ public HRegionInfo getRegionInfo(){ return regionInfo; } + /** + * @return HServerAddress + */ public HServerAddress getServerAddress(){ return serverAddress; } @@ -588,6 +595,23 @@ public class HClient implements HConstants { this.tableServers = getTableServers(tableName); } + /** + * Gets the starting row key for every region in the currently open table + * @return Array of region starting row keys + */ + public synchronized Text[] getStartKeys() { + if(this.tableServers == null) { + throw new IllegalStateException("Must open table first"); + } + + Text[] keys = new Text[tableServers.size()]; + int i = 0; + for(Text key: tableServers.keySet()){ + keys[i++] = key; + } + return keys; + } + /** * Gets the servers of the given table. * @@ -1360,6 +1384,7 @@ public class HClient implements HConstants { private Text startRow; private boolean closed; private RegionLocation[] regions; + @SuppressWarnings("hiding") private int currentRegion; private HRegionInterface server; private long scannerId; diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 69a93d94790..affb969737a 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; @@ -1854,7 +1855,6 @@ public class HMaster implements HConstants, HMasterInterface, try { DataInputBuffer inbuf = new DataInputBuffer(); - byte[] bytes; while(true) { HRegionInfo info = new HRegionInfo(); String serverName = null; @@ -1978,8 +1978,7 @@ public class HMaster implements HConstants, HMasterInterface, @Override protected void processScanItem(String serverName, long startCode, - HRegionInfo info) - throws IOException { + HRegionInfo info) { if (isBeingServed(serverName, startCode)) { TreeSet regions = servedRegions.get(serverName); if (regions == null) { @@ -2260,6 +2259,7 @@ public class HMaster implements HConstants, HMasterInterface, /** Instantiated to monitor the health of a region server */ private class ServerExpirer implements LeaseListener { + @SuppressWarnings("hiding") private String server; ServerExpirer(String server) { diff --git a/src/java/org/apache/hadoop/hbase/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/HRegionInterface.java index b4a709ad595..3b47896d765 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInterface.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index f1c79a0f45e..0241b4fb5fc 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.RPC; diff --git a/src/java/org/apache/hadoop/hbase/KeyedData.java b/src/java/org/apache/hadoop/hbase/io/KeyedData.java similarity index 91% rename from src/java/org/apache/hadoop/hbase/KeyedData.java rename to src/java/org/apache/hadoop/hbase/io/KeyedData.java index 558da62f785..379347d7e3f 100644 --- a/src/java/org/apache/hadoop/hbase/KeyedData.java +++ b/src/java/org/apache/hadoop/hbase/io/KeyedData.java @@ -13,13 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase; +package org.apache.hadoop.hbase.io; +import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.io.*; import java.io.*; /******************************************************************************* - * LabelledData is just a data pair. + * KeyedData is just a data pair. * It includes an HStoreKey and some associated data. ******************************************************************************/ public class KeyedData implements Writable { @@ -60,7 +61,7 @@ public class KeyedData implements Writable { */ public void write(DataOutput out) throws IOException { key.write(out); - out.writeShort(this.data.length); + out.writeInt(this.data.length); out.write(this.data); } @@ -69,7 +70,7 @@ public class KeyedData implements Writable { */ public void readFields(DataInput in) throws IOException { key.readFields(in); - this.data = new byte[in.readShort()]; + this.data = new byte[in.readInt()]; in.readFully(this.data); } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java b/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java new file mode 100644 index 00000000000..f13c17bc4b7 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java @@ -0,0 +1,80 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Wraps an array of KeyedData items as a Writable. The array elements + * may be null. + */ +public class KeyedDataArrayWritable implements Writable { + + private final static KeyedData NULL_KEYEDDATA = new KeyedData(); + + private KeyedData[] m_data; + + /** + * Make a record of length 0 + */ + public KeyedDataArrayWritable() { + m_data = new KeyedData[0]; + } + + /** @return the array of KeyedData */ + public KeyedData[] get() { + return m_data; + } + + /** + * Sets the KeyedData array + * + * @param data array of KeyedData + */ + public void set(KeyedData[] data) { + if(data == null) { + throw new NullPointerException("KeyedData[] cannot be null"); + } + m_data = data; + } + + // Writable + + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + m_data = new KeyedData[len]; + for(int i = 0; i < len; i++) { + m_data[i] = new KeyedData(); + m_data[i].readFields(in); + } + } + + public void write(DataOutput out) throws IOException { + int len = m_data.length; + out.writeInt(len); + for(int i = 0; i < len; i++) { + if(m_data[i] != null) { + m_data[i].write(out); + } else { + NULL_KEYEDDATA.write(out); + } + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java new file mode 100644 index 00000000000..c5b1c3664b2 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java @@ -0,0 +1,152 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Extract grouping columns from input record + */ +public class GroupingTableMap extends TableMap { + + /** + * JobConf parameter to specify the columns used to produce the key passed to + * collect from the map phase + */ + public static final String GROUP_COLUMNS = + "hbase.mapred.groupingtablemap.columns"; + + private Text[] m_columns; + + /** default constructor */ + public GroupingTableMap() { + super(); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up the + * JobConf. + * + * @param table table to be processed + * @param columns space separated list of columns to fetch + * @param groupColumns space separated list of columns used to form the key used in collect + * @param mapper map class + * @param job job configuration object + */ + public static void initJob(String table, String columns, String groupColumns, + Class mapper, JobConf job) { + + initJob(table, columns, mapper, job); + job.set(GROUP_COLUMNS, groupColumns); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.mapred.TableMap#configure(org.apache.hadoop.mapred.JobConf) + */ + @Override + public void configure(JobConf job) { + super.configure(job); + String[] cols = job.get(GROUP_COLUMNS, "").split(" "); + m_columns = new Text[cols.length]; + for(int i = 0; i < cols.length; i++) { + m_columns[i] = new Text(cols[i]); + } + } + + /** + * Extract the grouping columns from value to construct a new key. + * + * Pass the new key and value to reduce. + * If any of the grouping columns are not found in the value, the record is skipped. + * + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @Override + public void map(@SuppressWarnings("unused") HStoreKey key, + KeyedDataArrayWritable value, TableOutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + byte[][] keyVals = extractKeyValues(value); + if(keyVals != null) { + Text tKey = createGroupKey(keyVals); + output.collect(tKey, value); + } + } + + /** + * Extract columns values from the current record. This method returns + * null if any of the columns are not found. + * + * Override this method if you want to deal with nulls differently. + * + * @param r + * @return array of byte values + */ + protected byte[][] extractKeyValues(KeyedDataArrayWritable r) { + byte[][] keyVals = null; + ArrayList foundList = new ArrayList(); + int numCols = m_columns.length; + if(numCols > 0) { + KeyedData[] recVals = r.get(); + boolean found = true; + for(int i = 0; i < numCols && found; i++) { + found = false; + for(int j = 0; j < recVals.length; j++) { + if(recVals[j].getKey().getColumn().equals(m_columns[i])) { + found = true; + byte[] val = recVals[j].getData(); + foundList.add(val); + break; + } + } + } + if(foundList.size() == numCols) { + keyVals = foundList.toArray(new byte[numCols][]); + } + } + return keyVals; + } + + /** + * Create a key by concatenating multiple column values. + * Override this function in order to produce different types of keys. + * + * @param vals + * @return key generated by concatenating multiple column values + */ + protected Text createGroupKey(byte[][] vals) { + if(vals == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < vals.length; i++) { + if(i > 0) { + sb.append(" "); + } + sb.append(new String(vals[i])); + } + return new Text(sb.toString()); + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java new file mode 100644 index 00000000000..0ec5323f3e8 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java @@ -0,0 +1,49 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Pass the given key and record as-is to reduce + */ +public class IdentityTableMap extends TableMap { + + /** constructor */ + public IdentityTableMap() { + super(); + } + + /** + * Pass the key, value to reduce + * + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @Override + public void map(HStoreKey key, KeyedDataArrayWritable value, + TableOutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + Text tKey = key.getRow(); + output.collect(tKey, value); + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java new file mode 100644 index 00000000000..b8d215512e9 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java @@ -0,0 +1,51 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Write to table each key, record pair + */ +public class IdentityTableReduce extends TableReduce { + + /** constructor */ + public IdentityTableReduce() { + super(); + } + + /** + * No aggregation, output pairs of (key, record) + * + * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @Override + public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values, + TableOutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + while(values.hasNext()) { + KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next(); + output.collect(key, r); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java new file mode 100644 index 00000000000..a00102baf8a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -0,0 +1,239 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import java.util.ArrayList; + +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.hadoop.hbase.HClient; +import org.apache.hadoop.hbase.HScannerInterface; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; + +import org.apache.log4j.Logger; + +/** + * Convert HBase tabular data into a format that is consumable by Map/Reduce + */ +public class TableInputFormat implements InputFormat, JobConfigurable { + static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName()); + + /** + * space delimited list of columns + * @see org.apache.hadoop.hbase.HAbstractScanner for column name wildcards + */ + public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; + + private Text m_tableName; + Text[] m_cols; + HClient m_client; + + /** + * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs + */ + class TableRecordReader implements RecordReader { + private HScannerInterface m_scanner; + private TreeMap m_row; // current buffer + private Text m_endRow; + + /** + * Constructor + * @param startRow (inclusive) + * @param endRow (exclusive) + * @throws IOException + */ + public TableRecordReader(Text startRow, Text endRow) throws IOException { + LOG.debug("start construct"); + m_row = new TreeMap(); + m_scanner = m_client.obtainScanner(m_cols, startRow); + m_endRow = endRow; + LOG.debug("end construct"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.RecordReader#close() + */ + public void close() throws IOException { + LOG.debug("start close"); + m_scanner.close(); + LOG.debug("end close"); + } + + /** + * @return HStoreKey + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public WritableComparable createKey() { + return new HStoreKey(); + } + + /** + * @return KeyedDataArrayWritable of KeyedData + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Writable createValue() { + return new KeyedDataArrayWritable(); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.RecordReader#getPos() + */ + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.RecordReader#getProgress() + */ + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value KeyedDataArrayWritable as input value + * + * Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to + * (HStoreKey, KeyedDataArrayWritable) + * @return true if there was more data + * @throws IOException + */ + public boolean next(Writable key, Writable value) throws IOException { + LOG.debug("start next"); + m_row.clear(); + HStoreKey tKey = (HStoreKey)key; + boolean hasMore = m_scanner.next(tKey, m_row); + + if(hasMore) { + if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) { + hasMore = false; + } else { + KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value; + ArrayList columns = new ArrayList(); + + for(Map.Entry e: m_row.entrySet()) { + HStoreKey keyCol = new HStoreKey(tKey); + keyCol.setColumn(e.getKey()); + columns.add(new KeyedData(keyCol, e.getValue())); + } + + // set the output + rowVal.set(columns.toArray(new KeyedData[columns.size()])); + } + } + LOG.debug("end next"); + return hasMore; + } + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(org.apache.hadoop.mapred.InputSplit, org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.Reporter) + */ + public RecordReader getRecordReader(InputSplit split, + @SuppressWarnings("unused") JobConf job, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + TableSplit tSplit = (TableSplit)split; + return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow()); + } + + /** + * A split will be created for each HRegion of the input table + * + * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) + */ + @SuppressWarnings("unused") + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + LOG.debug("start getSplits"); + + Text[] startKeys = m_client.getStartKeys(); + if(startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region"); + } + InputSplit[] splits = new InputSplit[startKeys.length]; + for(int i = 0; i < startKeys.length; i++) { + splits[i] = new TableSplit(m_tableName, startKeys[i], + ((i + 1) < startKeys.length) ? startKeys[i + 1] : new Text()); + LOG.debug("split: " + i + "->" + splits[i]); + } + LOG.debug("end splits"); + return splits; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + LOG.debug("start configure"); + Path[] tableNames = job.getInputPaths(); + m_tableName = new Text(tableNames[0].getName()); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + m_cols = new Text[colNames.length]; + for(int i = 0; i < m_cols.length; i++) { + m_cols[i] = new Text(colNames[i]); + } + m_client = new HClient(job); + try { + m_client.openTable(m_tableName); + } catch(Exception e) { + LOG.error(e); + } + LOG.debug("end configure"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputFormat#validateInput(org.apache.hadoop.mapred.JobConf) + */ + public void validateInput(JobConf job) throws IOException { + + // expecting exactly one path + + Path[] tableNames = job.getInputPaths(); + if(tableNames == null || tableNames.length > 1) { + throw new IOException("expecting one table name"); + } + + // expecting at least one column + + String colArg = job.get(COLUMN_LIST); + if(colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMap.java b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java new file mode 100644 index 00000000000..5fe3db3626c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java @@ -0,0 +1,112 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * Scan an HBase table to sort by a specified sort column. + * If the column does not exist, the record is not passed to Reduce. + * + */ +public abstract class TableMap extends MapReduceBase implements Mapper { + + private static final Logger LOG = Logger.getLogger(TableMap.class.getName()); + + private TableOutputCollector m_collector; + + /** constructor*/ + public TableMap() { + m_collector = new TableOutputCollector(); + } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table table name + * @param columns columns to scan + * @param mapper mapper class + * @param job job configuration + */ + public static void initJob(String table, String columns, + Class mapper, JobConf job) { + + job.setInputFormat(TableInputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(KeyedDataArrayWritable.class); + job.setMapperClass(mapper); + job.setInputPath(new Path(table)); + job.set(TableInputFormat.COLUMN_LIST, columns); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + @Override + public void configure(JobConf job) { + super.configure(job); + } + + /** + * Input: + * @param key is of type HStoreKey + * @param value is of type KeyedDataArrayWritable + * @param output output collector + * @param reporter object to use for status updates + * @throws IOException + * + * Output: + * The key is a specific column, including the input key or any value + * The value is of type LabeledData + */ + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + + LOG.debug("start map"); + if(m_collector.collector == null) { + m_collector.collector = output; + } + map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter); + LOG.debug("end map"); + } + + /** + * Call a user defined function on a single HBase record, represented + * by a key and its associated record value. + * + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public abstract void map(HStoreKey key, KeyedDataArrayWritable value, + TableOutputCollector output, Reporter reporter) throws IOException; +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java b/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java new file mode 100644 index 00000000000..63e3c321829 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java @@ -0,0 +1,43 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; + +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; + +/** + * Refine the types that can be collected from a Table Map/Reduce jobs. + */ +public class TableOutputCollector { + /** The collector object */ + public OutputCollector collector; + + /** + * Restrict Table Map/Reduce's output to be a Text key and a record. + * + * @param key + * @param value + * @throws IOException + */ + public void collect(Text key, KeyedDataArrayWritable value) + throws IOException { + collector.collect(key, value); + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java new file mode 100644 index 00000000000..dbd6656378e --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -0,0 +1,137 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormatBase; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +import org.apache.hadoop.hbase.HClient; +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; + +import org.apache.log4j.Logger; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +public class TableOutputFormat extends OutputFormatBase { + + /** JobConf parameter that specifies the output table */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + + static final Logger LOG = Logger.getLogger(TableOutputFormat.class.getName()); + + /** constructor */ + public TableOutputFormat() {} + + /** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table + */ + protected class TableRecordWriter implements RecordWriter { + private HClient m_client; + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @param client + */ + public TableRecordWriter(HClient client) { + m_client = client; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.RecordWriter#close(org.apache.hadoop.mapred.Reporter) + */ + public void close(@SuppressWarnings("unused") Reporter reporter) {} + + /** + * Expect key to be of type Text + * Expect value to be of type KeyedDataArrayWritable + * + * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable) + */ + public void write(WritableComparable key, Writable value) throws IOException { + LOG.debug("start write"); + Text tKey = (Text)key; + KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value; + KeyedData[] columns = tValue.get(); + + // start transaction + + long xid = m_client.startUpdate(tKey); + + for(int i = 0; i < columns.length; i++) { + KeyedData column = columns[i]; + m_client.put(xid, column.getKey().getColumn(), column.getData()); + } + + // end transaction + + m_client.commit(xid); + + LOG.debug("end write"); + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable) + */ + @Override + @SuppressWarnings("unused") + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + + // expecting exactly one path + + LOG.debug("start get writer"); + Text tableName = new Text(job.get(OUTPUT_TABLE)); + HClient client = null; + try { + client = new HClient(job); + client.openTable(tableName); + } catch(Exception e) { + LOG.error(e); + } + LOG.debug("end get writer"); + return new TableRecordWriter(client); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.OutputFormatBase#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf) + */ + @Override + @SuppressWarnings("unused") + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + + String tableName = job.get(OUTPUT_TABLE); + if(tableName == null) { + throw new IOException("Must specify table name"); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java new file mode 100644 index 00000000000..3349d8a5995 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java @@ -0,0 +1,89 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +/** + * Write a table, sorting by the input key + */ +public abstract class TableReduce extends MapReduceBase implements Reducer { + private static final Logger LOG = + Logger.getLogger(TableReduce.class.getName()); + + TableOutputCollector m_collector; + + /** Constructor */ + public TableReduce() { + m_collector = new TableOutputCollector(); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, Class reducer, + JobConf job) { + + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + } + + /** + * Create a unique key for table insertion by appending a local + * counter the given key. + * + * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @SuppressWarnings("unchecked") + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) throws IOException { + LOG.debug("start reduce"); + if(m_collector.collector == null) { + m_collector.collector = output; + } + reduce((Text)key, values, m_collector, reporter); + LOG.debug("end reduce"); + } + + /** + * + * @param key + * @param values + * @param output + * @param reporter + * @throws IOException + */ + @SuppressWarnings("unchecked") + public abstract void reduce(Text key, Iterator values, + TableOutputCollector output, Reporter reporter) throws IOException; + +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java new file mode 100644 index 00000000000..1545c0eb064 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java @@ -0,0 +1,106 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; + +/** + * A table split corresponds to a key range [low, high) + */ +public class TableSplit implements InputSplit { + private Text m_tableName; + private Text m_startRow; + private Text m_endRow; + + /** default constructor */ + public TableSplit() { + m_tableName = new Text(); + m_startRow = new Text(); + m_endRow = new Text(); + } + + /** + * Constructor + * @param tableName + * @param startRow + * @param endRow + */ + public TableSplit(Text tableName, Text startRow, Text endRow) { + this(); + m_tableName.set(tableName); + m_startRow.set(startRow); + m_endRow.set(endRow); + } + + /** @return table name */ + public Text getTableName() { + return m_tableName; + } + + /** @return starting row key */ + public Text getStartRow() { + return m_startRow; + } + + /** @return end row key */ + public Text getEndRow() { + return m_endRow; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputSplit#getLength() + */ + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputSplit#getLocations() + */ + public String[] getLocations() { + // Return a random node from the cluster for now + return new String[] { }; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + m_tableName.readFields(in); + m_startRow.readFields(in); + m_endRow.readFields(in); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + m_tableName.write(out); + m_startRow.write(out); + m_endRow.write(out); + } + + @Override + public String toString() { + return m_tableName +"," + m_startRow + "," + m_endRow; + } +} diff --git a/src/test/hbase-site.xml b/src/test/hbase-site.xml index 173debf0e13..282ff678c4d 100644 --- a/src/test/hbase-site.xml +++ b/src/test/hbase-site.xml @@ -11,13 +11,13 @@ hbase.client.pause - 3000 + 5000 General client pause value. Used mostly as value to wait before running a retry of a failed get, region lookup, etc. hbase.client.retries.number - 2 + 5 Maximum retries. Used as maximum for all retryable operations such as fetching of the root region from root region server, getting a cell's value, starting a row update, etc. diff --git a/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java b/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java index dcaa8b9db35..b92706661ff 100644 --- a/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java +++ b/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Random; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -141,17 +140,4 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase { return region; } - private HRegion createNewHRegion(FileSystem fs, Path dir, - Configuration conf, HTableDescriptor desc, long regionId, Text startKey, - Text endKey) throws IOException { - - HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey); - Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); - fs.mkdirs(regionDir); - - return new HRegion(dir, - new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), - fs, conf, info, null); - } - } diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index bcdb004c937..48c6ed96c6b 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -15,10 +15,14 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; + import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; /** * Abstract base class for test cases. Performs all static initialization @@ -43,4 +47,18 @@ public abstract class HBaseTestCase extends TestCase { protected Path getUnitTestdir(String testName) { return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName); } + + protected HRegion createNewHRegion(FileSystem fs, Path dir, + Configuration conf, HTableDescriptor desc, long regionId, Text startKey, + Text endKey) throws IOException { + + HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey); + Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); + fs.mkdirs(regionDir); + + return new HRegion(dir, + new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), + fs, conf, info, null); + } + } diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index 11b6355cab5..18e3945d20d 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -41,7 +41,8 @@ public class MiniHBaseCluster implements HConstants { private Thread masterThread; List regionServers; List regionThreads; - + private boolean deleteOnExit = true; + /** * Starts a MiniHBaseCluster on top of a new MiniDFSCluster * @@ -51,9 +52,23 @@ public class MiniHBaseCluster implements HConstants { */ public MiniHBaseCluster(Configuration conf, int nRegionNodes) throws IOException { - this(conf, nRegionNodes, true); + this(conf, nRegionNodes, true, true, true); } - + + /** + * Start a MiniHBaseCluster. Use the native file system unless + * miniHdfsFilesystem is set to true. + * + * @param conf + * @param nRegionNodes + * @param miniHdfsFilesystem + * @throws IOException + */ + public MiniHBaseCluster(Configuration conf, int nRegionNodes, + final boolean miniHdfsFilesystem) throws IOException { + this(conf, nRegionNodes, miniHdfsFilesystem, true, true); + } + /** * Starts a MiniHBaseCluster on top of an existing HDFSCluster * @@ -70,7 +85,7 @@ public class MiniHBaseCluster implements HConstants { this.cluster = dfsCluster; init(nRegionNodes); } - + /** * Constructor. * @param conf @@ -78,16 +93,20 @@ public class MiniHBaseCluster implements HConstants { * @param miniHdfsFilesystem If true, set the hbase mini * cluster atop a mini hdfs cluster. Otherwise, use the * filesystem configured in conf. + * @param format the mini hdfs cluster + * @param deleteOnExit clean up mini hdfs files * @throws IOException */ public MiniHBaseCluster(Configuration conf, int nRegionNodes, - final boolean miniHdfsFilesystem) + final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit) throws IOException { this.conf = conf; - + this.deleteOnExit = deleteOnExit; + if (miniHdfsFilesystem) { try { - this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); + this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null); + } catch(Throwable t) { LOG.error("Failed setup of mini dfs cluster", t); t.printStackTrace(); @@ -112,7 +131,7 @@ public class MiniHBaseCluster implements HConstants { if(this.conf.get(MASTER_ADDRESS) == null) { this.conf.set(MASTER_ADDRESS, "localhost:0"); } - + // Create the master this.master = new HMaster(conf); this.masterThread = new Thread(this.master, "HMaster"); @@ -120,7 +139,7 @@ public class MiniHBaseCluster implements HConstants { // Start up the master LOG.info("Starting HMaster"); masterThread.start(); - + // Set the master's port for the HRegionServers String address = master.getMasterAddress().toString(); this.conf.set(MASTER_ADDRESS, address); @@ -137,15 +156,24 @@ public class MiniHBaseCluster implements HConstants { } } + /** + * Get the cluster on which this HBase cluster is running + * + * @return MiniDFSCluster + */ + public MiniDFSCluster getDFSCluster() { + return cluster; + } + private void startRegionServers(final int nRegionNodes) - throws IOException { + throws IOException { this.regionServers = new ArrayList(nRegionNodes); this.regionThreads = new ArrayList(nRegionNodes); for(int i = 0; i < nRegionNodes; i++) { startRegionServer(); } } - + void startRegionServer() throws IOException { HRegionServer hsr = new HRegionServer(this.conf); this.regionServers.add(hsr); @@ -153,7 +181,7 @@ public class MiniHBaseCluster implements HConstants { t.start(); this.regionThreads.add(t); } - + /** * @return Returns the rpc address actually used by the master server, because * the supplied port is not necessarily the actual port used. @@ -161,7 +189,7 @@ public class MiniHBaseCluster implements HConstants { public HServerAddress getHMasterAddress() { return master.getMasterAddress(); } - + /** * Shut down the specified region server cleanly * @@ -170,15 +198,20 @@ public class MiniHBaseCluster implements HConstants { public void stopRegionServer(int serverNumber) { if (serverNumber >= regionServers.size()) { throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); + "serverNumber > number of region servers"); } this.regionServers.get(serverNumber).stop(); } - + + /** + * Wait for the specified region server to stop + * + * @param serverNumber + */ public void waitOnRegionServer(int serverNumber) { if (serverNumber >= regionServers.size()) { throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); + "serverNumber > number of region servers"); } try { this.regionThreads.get(serverNumber).join(); @@ -186,7 +219,7 @@ public class MiniHBaseCluster implements HConstants { e.printStackTrace(); } } - + /** * Cause a region server to exit without cleaning up * @@ -195,11 +228,11 @@ public class MiniHBaseCluster implements HConstants { public void abortRegionServer(int serverNumber) { if(serverNumber >= this.regionServers.size()) { throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); + "serverNumber > number of region servers"); } this.regionServers.get(serverNumber).abort(); } - + /** Shut down the HBase cluster */ public void shutdown() { LOG.info("Shutting down the HBase Cluster"); @@ -218,6 +251,7 @@ public class MiniHBaseCluster implements HConstants { } try { masterThread.join(); + } catch(InterruptedException e) { // continue } @@ -227,12 +261,14 @@ public class MiniHBaseCluster implements HConstants { LOG.info("Shutting down Mini DFS cluster"); cluster.shutdown(); } - + // Delete all DFS files - deleteFile(new File(System.getProperty( - StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs")); + if(deleteOnExit) { + deleteFile(new File(System.getProperty( + StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs")); + } } - + private void deleteFile(File f) { if(f.isDirectory()) { File[] children = f.listFiles(); @@ -242,4 +278,4 @@ public class MiniHBaseCluster implements HConstants { } f.delete(); } -} \ No newline at end of file +} diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java index 6400dcfa77c..593c86ab926 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner2.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -23,6 +23,7 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; /** diff --git a/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java new file mode 100644 index 00000000000..49b18858859 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java @@ -0,0 +1,239 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.dfs.MiniDFSCluster; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; + +import org.apache.hadoop.hbase.mapred.TableMap; +import org.apache.hadoop.hbase.mapred.TableOutputCollector; +import org.apache.hadoop.hbase.mapred.IdentityTableReduce; + +/** + * Test Map/Reduce job over HBase tables + */ +public class TestTableMapReduce extends HBaseTestCase { + static final String TABLE_NAME = "test"; + static final String INPUT_COLUMN = "contents:"; + static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); + static final String OUTPUT_COLUMN = "text:"; + static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); + + private Random rand; + private HTableDescriptor desc; + + private MiniDFSCluster dfsCluster = null; + private FileSystem fs; + private Path dir; + private MiniHBaseCluster hCluster = null; + + private byte[][] values = { + "0123".getBytes(), + "abcd".getBytes(), + "wxyz".getBytes(), + "6789".getBytes() + }; + + @Override + public void setUp() throws Exception { + super.setUp(); + rand = new Random(); + desc = new HTableDescriptor("test"); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); + fs = dfsCluster.getFileSystem(); + dir = new Path("/hbase"); + fs.mkdirs(dir); + + // create the root and meta regions and insert the data region into the meta + + HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null); + HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null); + HRegion.addRegionToMETA(root, meta); + + HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), null, null); + HRegion.addRegionToMETA(meta, region); + + // insert some data into the test table + + for(int i = 0; i < values.length; i++) { + long lockid = region.startUpdate(new Text("row_" + + String.format("%1$05d", i))); + + region.put(lockid, TEXT_INPUT_COLUMN, values[i]); + region.commit(lockid); + } + + region.close(); + region.getLog().closeAndDelete(); + meta.close(); + meta.getLog().closeAndDelete(); + root.close(); + root.getLog().closeAndDelete(); + + // Start up HBase cluster + + hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if(hCluster != null) { + hCluster.shutdown(); + } + + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper extends TableMap { + + /** constructor */ + public ProcessContentsMapper() { + super(); + } + + /** + * Pass the key, and reversed value to reduce + * + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @Override + public void map(HStoreKey key, KeyedDataArrayWritable value, + TableOutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + Text tKey = key.getRow(); + KeyedData[] columns = value.get(); + + if(columns.length != 1) { + throw new IOException("There should only be one input column"); + } + + if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) { + throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN + + " but got: " + columns[0].getKey().getColumn()); + } + + // Get the input column key and change it to the output column key + + HStoreKey column = columns[0].getKey(); + column.setColumn(TEXT_OUTPUT_COLUMN); + + // Get the original value and reverse it + + String originalValue = new String(columns[0].getData()); + StringBuilder newValue = new StringBuilder(); + for(int i = originalValue.length() - 1; i >= 0; i--) { + newValue.append(originalValue.charAt(i)); + } + + // Now set the value to be collected + + columns[0] = new KeyedData(column, newValue.toString().getBytes()); + value.set(columns); + + output.collect(tKey, value); + } + } + + /** + * Test HBase map/reduce + * @throws IOException + */ + @SuppressWarnings("static-access") + public void testTableMapReduce() throws IOException { + System.out.println("Print table contents before map/reduce"); + scanTable(conf); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(1); + + ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf); + + JobClient.runJob(jobConf); + + } finally { + mrCluster.shutdown(); + } + + System.out.println("Print table contents after map/reduce"); + scanTable(conf); + } + + private void scanTable(Configuration conf) throws IOException { + HClient client = new HClient(conf); + client.openTable(new Text(TABLE_NAME)); + + Text[] columns = { + TEXT_INPUT_COLUMN, + TEXT_OUTPUT_COLUMN + }; + HScannerInterface scanner = + client.obtainScanner(columns, HClient.EMPTY_START_ROW); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + System.out.print("row: " + key.getRow()); + + for(Map.Entry e: results.entrySet()) { + System.out.print(" column: " + e.getKey() + " value: " + + new String(e.getValue())); + } + System.out.println(); + } + + } finally { + scanner.close(); + } + } +}