HADOOP-1709 Make HRegionInterface more like that of HTable

HADOOP-1725 Client find of table regions should not include offlined, split parents

Changes:

New class MapWritable replaces KeyedData and KeyedDataArrayWritable

HBaseAdmin, HConnectionManager, HMaster, HRegionInterface,
HRegionServer, HTable, TestScanner2:
- getRow returns MapWritable instead of array of KeyedData
- next returns MapWritable instead of array of KeyedData

GroupingTableMap, IdentityTableMap, IdentityTableReduce,
TableInputFormat, TableMap, TableOutputCollector, TableOutputFormat,
TestTableMapReduce:
- use MapWritable instead of KeyedData and KeyedDataArrayWritable


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@566878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-08-16 22:51:03 +00:00
parent e220809017
commit b7f01dce98
25 changed files with 660 additions and 423 deletions

View File

@ -79,7 +79,7 @@ Trunk (unreleased changes)
10 concurrent clients
50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches
to a single row at a time)
51. HADOOP-1528 HClient for multiple tables (phase 1)
51. HADOOP-1528 HClient for multiple tables (phase 1) (James Kennedy & JimK)
52. HADOOP-1528 HClient for multiple tables (phase 2) all HBase client side code
(except TestHClient and HBaseShell) have been converted to use the new client
side objects (HTable/HBaseAdmin/HConnection) instead of HClient.
@ -98,3 +98,6 @@ Trunk (unreleased changes)
60. HADOOP-1644 Compactions should not block updates
60. HADOOP-1672 HBase Shell should use new client classes
(Edward Yoon via Stack).
61. HADOOP-1709 Make HRegionInterface more like that of HTable
HADOOP-1725 Client find of table regions should not include offlined, split parents

View File

@ -21,16 +21,20 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Writables;
/**
@ -178,15 +182,17 @@ public class HBaseAdmin implements HConstants {
scannerId =
server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
KeyedData[] values = server.next(scannerId);
if (values == null || values.length == 0) {
MapWritable values = server.next(scannerId);
if (values == null || values.size() == 0) {
break;
}
boolean found = false;
for (int j = 0; j < values.length; j++) {
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
info =
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
if (key.getColumn().equals(COL_REGIONINFO)) {
info = (HRegionInfo) Writables.getWritable(
((ImmutableBytesWritable) e.getValue()).get(), info);
if (info.tableDesc.getName().equals(tableName)) {
found = true;
}
@ -260,8 +266,8 @@ public class HBaseAdmin implements HConstants {
boolean isenabled = false;
while (true) {
KeyedData[] values = server.next(scannerId);
if (values == null || values.length == 0) {
MapWritable values = server.next(scannerId);
if (values == null || values.size() == 0) {
if (valuesfound == 0) {
throw new NoSuchElementException(
"table " + tableName + " not found");
@ -269,10 +275,12 @@ public class HBaseAdmin implements HConstants {
break;
}
valuesfound += 1;
for (int j = 0; j < values.length; j++) {
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
info =
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
if (key.getColumn().equals(COL_REGIONINFO)) {
info = (HRegionInfo) Writables.getWritable(
((ImmutableBytesWritable) e.getValue()).get(), info);
isenabled = !info.offLine;
break;
}
@ -359,18 +367,20 @@ public class HBaseAdmin implements HConstants {
boolean disabled = false;
while (true) {
KeyedData[] values = server.next(scannerId);
if (values == null || values.length == 0) {
MapWritable values = server.next(scannerId);
if (values == null || values.size() == 0) {
if (valuesfound == 0) {
throw new NoSuchElementException("table " + tableName + " not found");
}
break;
}
valuesfound += 1;
for (int j = 0; j < values.length; j++) {
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
info =
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
if (key.getColumn().equals(COL_REGIONINFO)) {
info = (HRegionInfo) Writables.getWritable(
((ImmutableBytesWritable) e.getValue()).get(), info);
disabled = info.offLine;
break;
}

View File

@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,8 +34,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.hbase.util.Writables;
/**
@ -228,7 +230,7 @@ public class HConnectionManager implements HConstants {
/** {@inheritDoc} */
public HTableDescriptor[] listTables() throws IOException {
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
SortedMap<Text, HRegionLocation> metaTables =
getTableServers(META_TABLE_NAME);
@ -241,16 +243,17 @@ public class HConnectionManager implements HConstants {
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
null);
HRegionInfo info = new HRegionInfo();
while (true) {
KeyedData[] values = server.next(scannerId);
if (values.length == 0) {
MapWritable values = server.next(scannerId);
if (values == null || values.size() == 0) {
break;
}
for (int i = 0; i < values.length; i++) {
if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
HRegionInfo info =
(HRegionInfo) Writables.getWritable(values[i].getData(),
new HRegionInfo());
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
if (key.getColumn().equals(COL_REGIONINFO)) {
info = (HRegionInfo) Writables.getWritable(
((ImmutableBytesWritable) e.getValue()).get(), info);
// Only examine the rows where the startKey is zero length
if (info.startKey.getLength() == 0) {
@ -272,9 +275,9 @@ public class HConnectionManager implements HConstants {
}
/** {@inheritDoc} */
public SortedMap<Text, HRegionLocation>
getTableServers(Text tableName)
throws IOException {
public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
throws IOException {
if (tableName == null || tableName.getLength() == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
@ -542,7 +545,7 @@ public class HConnectionManager implements HConstants {
* @return map of first row to TableInfo for all meta regions
* @throws IOException
*/
private TreeMap<Text, HRegionLocation> loadMetaFromRoot()
private SortedMap<Text, HRegionLocation> loadMetaFromRoot()
throws IOException {
SortedMap<Text, HRegionLocation> rootRegion =
@ -646,7 +649,7 @@ public class HConnectionManager implements HConstants {
* @throws NoServerForRegionException - if table can not be found after retrying
* @throws IOException
*/
private TreeMap<Text, HRegionLocation> scanOneMetaRegion(
private SortedMap<Text, HRegionLocation> scanOneMetaRegion(
final HRegionLocation t, final Text tableName) throws IOException {
HRegionInterface server = getHRegionConnection(t.getServerAddress());
@ -660,8 +663,8 @@ public class HConnectionManager implements HConstants {
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
while (true) {
KeyedData[] values = server.next(scannerId);
if (values.length == 0) {
MapWritable values = server.next(scannerId);
if (values == null || values.size() == 0) {
if (servers.size() == 0) {
// If we didn't find any servers then the table does not exist
throw new TableNotFoundException("table '" + tableName +
@ -676,9 +679,11 @@ public class HConnectionManager implements HConstants {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for (int i = 0; i < values.length; i++) {
results.put(values[i].getKey().getColumn(), values[i].getData());
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
results.put(key.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
byte[] bytes = results.get(COL_REGIONINFO);
@ -704,8 +709,13 @@ public class HConnectionManager implements HConstants {
}
break;
}
if (regionInfo.isSplit()) {
// Region is a split parent. Skip it.
continue;
}
if (regionInfo.isOffline() && !regionInfo.isSplit()) {
if (regionInfo.isOffline()) {
throw new IllegalStateException("table offline: " + tableName);
}

View File

@ -45,9 +45,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@ -187,8 +190,8 @@ HMasterRegionInterface, Runnable {
// Array to hold list of split parents found. Scan adds to list. After
// scan we go check if parents can be removed.
Map<HRegionInfo, TreeMap<Text, byte[]>> splitParents =
new HashMap<HRegionInfo, TreeMap<Text, byte[]>>();
Map<HRegionInfo, SortedMap<Text, byte[]>> splitParents =
new HashMap<HRegionInfo, SortedMap<Text, byte[]>>();
try {
regionServer = connection.getHRegionConnection(region.server);
scannerId =
@ -197,14 +200,16 @@ HMasterRegionInterface, Runnable {
int numberOfRegionsFound = 0;
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
KeyedData[] values = regionServer.next(scannerId);
if (values.length == 0) {
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
MapWritable values = regionServer.next(scannerId);
if (values == null || values.size() == 0) {
break;
}
for (int i = 0; i < values.length; i++) {
results.put(values[i].getKey().getColumn(), values[i].getData());
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
results.put(key.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
HRegionInfo info = (HRegionInfo) Writables.getWritable(
@ -260,10 +265,10 @@ HMasterRegionInterface, Runnable {
// Scan is finished. Take a look at split parents to see if any we can clean up.
if (splitParents.size() > 0) {
for (Map.Entry<HRegionInfo, TreeMap<Text, byte[]>> e:
for (Map.Entry<HRegionInfo, SortedMap<Text, byte[]>> e:
splitParents.entrySet()) {
TreeMap<Text, byte[]> results = e.getValue();
SortedMap<Text, byte[]> results = e.getValue();
cleanupSplits(region.regionName, regionServer, e.getKey(),
(HRegionInfo) Writables.getWritable(results.get(COL_SPLITA),
new HRegionInfo()),
@ -1643,7 +1648,7 @@ HMasterRegionInterface, Runnable {
try {
while (true) {
KeyedData[] values = null;
MapWritable values = null;
try {
values = server.next(scannerId);
@ -1658,23 +1663,25 @@ HMasterRegionInterface, Runnable {
break;
}
if (values == null || values.length == 0) {
if (values == null || values.size() == 0) {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
Text row = null;
for (int i = 0; i < values.length; i++) {
if(row == null) {
row = values[i].getKey().getRow();
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
Text thisRow = key.getRow();
if (row == null) {
row = thisRow;
} else {
if (!row.equals(values[i].getKey().getRow())) {
if (!row.equals(thisRow)) {
LOG.error("Multiple rows in same scanner result set. firstRow="
+ row + ", currentRow=" + values[i].getKey().getRow());
+ row + ", currentRow=" + thisRow);
}
}
results.put(values[i].getKey().getColumn(), values[i].getData());
results.put(key.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
if (LOG.isDebugEnabled() && row != null) {
@ -2317,19 +2324,22 @@ HMasterRegionInterface, Runnable {
long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
tableName, System.currentTimeMillis(), null);
try {
KeyedData[] data = server.next(scannerid);
MapWritable data = server.next(scannerid);
// Test data and that the row for the data is for our table. If table
// does not exist, scanner will return row after where our table would
// be inserted if it exists so look for exact match on table name.
if (data != null && data.length > 0 &&
HRegionInfo.getTableNameFromRegionName(
data[0].getKey().getRow()).equals(tableName)) {
// Then a region for this table already exists. Ergo table exists.
throw new TableExistsException(tableName.toString());
if (data != null && data.size() > 0) {
for (WritableComparable k: data.keySet()) {
if (HRegionInfo.getTableNameFromRegionName(
((HStoreKey) k).getRow()).equals(tableName)) {
// Then a region for this table already exists. Ergo table exists.
throw new TableExistsException(tableName.toString());
}
}
}
} finally {
@ -2462,35 +2472,38 @@ HMasterRegionInterface, Runnable {
String serverName = null;
long startCode = -1L;
KeyedData[] values = server.next(scannerId);
if(values == null || values.length == 0) {
MapWritable values = server.next(scannerId);
if(values == null || values.size() == 0) {
break;
}
boolean haveRegionInfo = false;
for (int i = 0; i < values.length; i++) {
if (values[i].getData().length == 0) {
for (Map.Entry<WritableComparable, Writable> e:
values.entrySet()) {
byte[] value = ((ImmutableBytesWritable) e.getValue()).get();
if (value == null || value.length == 0) {
break;
}
Text column = values[i].getKey().getColumn();
HStoreKey key = (HStoreKey) e.getKey();
Text column = key.getColumn();
if (column.equals(COL_REGIONINFO)) {
haveRegionInfo = true;
info = (HRegionInfo) Writables.getWritable(
values[i].getData(), info);
info = (HRegionInfo) Writables.getWritable(value, info);
} else if (column.equals(COL_SERVER)) {
try {
serverName =
Writables.bytesToString(values[i].getData());
Writables.bytesToString(value);
} catch (UnsupportedEncodingException e) {
} catch (UnsupportedEncodingException ex) {
assert(false);
}
} else if (column.equals(COL_STARTCODE)) {
try {
startCode = Writables.bytesToLong(values[i].getData());
startCode = Writables.bytesToLong(value);
} catch (UnsupportedEncodingException e) {
} catch (UnsupportedEncodingException ex) {
assert(false);
}
}

View File

@ -870,15 +870,6 @@ public class HRegion implements HConstants {
}
}
private Vector<HStoreFile> getAllStoreFiles() {
Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllStoreFiles();
allHStoreFiles.addAll(0, hstoreFiles);
}
return allHStoreFiles;
}
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
@ -93,11 +93,11 @@ public interface HRegionInterface extends VersionedProtocol {
*
* @param regionName region name
* @param row row key
* @return array of values
* @return map of values
* @throws IOException
*/
public KeyedData[] getRow(final Text regionName, final Text row)
throws IOException; //TODO
public MapWritable getRow(final Text regionName, final Text row)
throws IOException;
//////////////////////////////////////////////////////////////////////////////
// Start an atomic row insertion/update. No changes are committed until the
@ -244,10 +244,10 @@ public interface HRegionInterface extends VersionedProtocol {
* Get the next set of values
*
* @param scannerId clientId passed to openScanner
* @return array of values
* @return map of values
* @throws IOException
*/
public KeyedData[] next(long scannerId) throws IOException; //TODO
public MapWritable next(long scannerId) throws IOException;
/**
* Close a scanner

View File

@ -40,18 +40,22 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.hbase.util.Writables;
/*******************************************************************************
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@ -1021,22 +1025,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
/** {@inheritDoc} */
public KeyedData[] getRow(final Text regionName, final Text row)
public MapWritable getRow(final Text regionName, final Text row)
throws IOException {
requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
MapWritable result = new MapWritable(HStoreKey.class,
ImmutableBytesWritable.class,
new TreeMap<WritableComparable, Writable>());
TreeMap<Text, byte[]> map = region.getFull(row);
KeyedData result[] = new KeyedData[map.size()];
int counter = 0;
for (Map.Entry<Text, byte []> es: map.entrySet()) {
result[counter++] =
new KeyedData(new HStoreKey(row, es.getKey()), es.getValue());
result.put(new HStoreKey(row, es.getKey()),
new ImmutableBytesWritable(es.getValue()));
}
return result;
}
/** {@inheritDoc} */
public KeyedData[] next(final long scannerId)
public MapWritable next(final long scannerId)
throws IOException {
requestCount.incrementAndGet();
String scannerName = String.valueOf(scannerId);
@ -1048,13 +1054,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Collect values to be returned here
ArrayList<KeyedData> values = new ArrayList<KeyedData>();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
MapWritable values = new MapWritable(HStoreKey.class,
ImmutableBytesWritable.class,
new TreeMap<WritableComparable, Writable>());
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
while (s.next(key, results)) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
@ -1063,8 +1070,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Column value is deleted. Don't return it.
continue;
}
values.add(new KeyedData(k, val));
values.put(k, new ImmutableBytesWritable(val));
}
if(values.size() > 0) {
// Row has something in it. Return the value.
break;
@ -1074,7 +1082,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
results.clear();
}
return values.toArray(new KeyedData[values.size()]);
return values;
}
/*

View File

@ -572,6 +572,7 @@ class HStore implements HConstants {
*/
void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
throws IOException {
long maxId = maxSeenSeqID;
synchronized(compactLock) {
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
@ -607,12 +608,12 @@ class HStore implements HConstants {
// Compute the max-sequenceID seen in any of the to-be-compacted
// TreeMaps if it hasn't been passed in to us.
if (maxSeenSeqID == -1) {
if (maxId == -1) {
for (HStoreFile hsf: toCompactFiles) {
long seqid = hsf.loadInfo(fs);
if(seqid > 0) {
if(seqid > maxSeenSeqID) {
maxSeenSeqID = seqid;
if(seqid > maxId) {
maxId = seqid;
}
}
}
@ -629,8 +630,8 @@ class HStore implements HConstants {
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
if((! deleteSequenceInfo) && maxId >= 0) {
compactedOutputFile.writeInfo(fs, maxId);
} else {
compactedOutputFile.writeInfo(fs, -1);
}
@ -710,14 +711,35 @@ class HStore implements HConstants {
}
}
}
/** Interface for generic reader for compactions */
interface CompactionReader {
/**
* Closes the reader
* @throws IOException
*/
public void close() throws IOException;
/**
* Get the next key/value pair
*
* @param key
* @param val
* @return true if more data was returned
* @throws IOException
*/
public boolean next(WritableComparable key, Writable val)
throws IOException;
throws IOException;
/**
* Resets the reader
* @throws IOException
*/
public void reset() throws IOException;
}
/** A compaction reader for MapFile */
class MapFileCompactionReader implements CompactionReader {
final MapFile.Reader reader;
@ -725,15 +747,18 @@ class HStore implements HConstants {
this.reader = r;
}
/** {@inheritDoc} */
public void close() throws IOException {
this.reader.close();
}
/** {@inheritDoc} */
public boolean next(WritableComparable key, Writable val)
throws IOException {
return this.reader.next(key, val);
}
/** {@inheritDoc} */
public void reset() throws IOException {
this.reader.reset();
}
@ -1217,6 +1242,7 @@ class HStore implements HConstants {
return new HStoreScanner(timestamp, targetCols, firstRow);
}
/** {@inheritDoc} */
@Override
public String toString() {
return this.storeName;

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
@ -32,8 +33,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RemoteException;
/**
@ -183,12 +187,15 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
}
try {
@ -225,13 +232,16 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
// No more tries
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
}
try {
@ -279,13 +289,16 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
// No more tries
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
}
try {
@ -315,7 +328,7 @@ public class HTable implements HConstants {
*/
public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
checkClosed();
KeyedData[] value = null;
MapWritable value = null;
for (int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row);
HRegionInterface server =
@ -326,13 +339,16 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
// No more tries
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
}
try {
@ -342,10 +358,12 @@ public class HTable implements HConstants {
// continue
}
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
if (value != null && value.length != 0) {
for (int i = 0; i < value.length; i++) {
results.put(value[i].getKey().getColumn(), value[i].getData());
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
if (value != null && value.size() != 0) {
for (Map.Entry<WritableComparable, Writable> e: value.entrySet()) {
HStoreKey key = (HStoreKey) e.getKey();
results.put(key.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
}
return results;
@ -574,14 +592,17 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
if (tries < numRetries -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
} else {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
throw e;
}
}
@ -589,6 +610,7 @@ public class HTable implements HConstants {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
} finally {
@ -702,13 +724,17 @@ public class HTable implements HConstants {
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
if (tries == numRetries - 1) {
// No more tries
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
loadRegions();
}
@ -732,20 +758,22 @@ public class HTable implements HConstants {
if (this.closed) {
return false;
}
KeyedData[] values = null;
MapWritable values = null;
do {
values = this.server.next(this.scannerId);
} while (values != null && values.length == 0 && nextScanner());
} while (values != null && values.size() == 0 && nextScanner());
if (values != null && values.length != 0) {
for (int i = 0; i < values.length; i++) {
key.setRow(values[i].getKey().getRow());
key.setVersion(values[i].getKey().getTimestamp());
if (values != null && values.size() != 0) {
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey k = (HStoreKey) e.getKey();
key.setRow(k.getRow());
key.setVersion(k.getTimestamp());
key.setColumn(EMPTY_COLUMN);
results.put(values[i].getKey().getColumn(), values[i].getData());
results.put(k.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
}
return values == null ? false : values.length != 0;
return values == null ? false : values.size() != 0;
}
/**

View File

@ -1,74 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.*;
import java.io.*;
/*******************************************************************************
* KeyedData is just a data pair.
* It includes an HStoreKey and some associated data.
******************************************************************************/
public class KeyedData implements Writable {
HStoreKey key;
byte [] data;
/** Default constructor. Used by Writable interface */
public KeyedData() {
this.key = new HStoreKey();
}
/**
* Create a KeyedData object specifying the parts
* @param key HStoreKey
* @param data
*/
public KeyedData(HStoreKey key, byte [] data) {
this.key = key;
this.data = data;
}
/** @return returns the key */
public HStoreKey getKey() {
return key;
}
/** @return - returns the value */
public byte [] getData() {
return data;
}
// Writable
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
key.write(out);
out.writeInt(this.data.length);
out.write(this.data);
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
key.readFields(in);
this.data = new byte[in.readInt()];
in.readFully(this.data);
}
}

View File

@ -1,86 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* Wraps an array of KeyedData items as a Writable. The array elements
* may be null.
*/
public class KeyedDataArrayWritable implements Writable {
private final static KeyedData NULL_KEYEDDATA = new KeyedData();
private KeyedData[] m_data;
/**
* Make a record of length 0
*/
public KeyedDataArrayWritable() {
m_data = new KeyedData[0];
}
/** @return the array of KeyedData */
public KeyedData[] get() {
return m_data;
}
/**
* Sets the KeyedData array
*
* @param data array of KeyedData
*/
public void set(KeyedData[] data) {
if(data == null) {
throw new NullPointerException("KeyedData[] cannot be null");
}
m_data = data;
}
// Writable
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
int len = in.readInt();
m_data = new KeyedData[len];
for(int i = 0; i < len; i++) {
m_data[i] = new KeyedData();
m_data[i].readFields(in);
}
}
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
int len = m_data.length;
out.writeInt(len);
for(int i = 0; i < len; i++) {
if(m_data[i] != null) {
m_data[i].write(out);
} else {
NULL_KEYEDDATA.write(out);
}
}
}
}

View File

@ -0,0 +1,303 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HConstants;
/**
*
*/
public class MapWritable implements Writable, Map<WritableComparable, Writable> {
private String keyClass = null;
private String valueClass = null;
private String mapClass = null;
private Map<WritableComparable, Writable> instance = null;
/**
* Default constructor used by writable
*/
public MapWritable() {}
/**
* @param keyClass the class of the keys
* @param valueClass the class of the values
* @param instance the Map to be wrapped in this Writable
*/
@SuppressWarnings("unchecked")
public MapWritable(Class keyClass, Class valueClass,
Map<WritableComparable, Writable> instance) {
this.keyClass = keyClass.getName();
this.valueClass = valueClass.getName();
this.instance = instance;
this.mapClass = instance.getClass().getName();
}
private void checkInitialized() {
if (keyClass == null ||
valueClass == null ||
mapClass == null ||
instance == null) {
throw new IllegalStateException("object has not been properly initialized");
}
}
/** {@inheritDoc} */
public void clear() {
checkInitialized();
instance.clear();
}
/** {@inheritDoc} */
public boolean containsKey(Object key) {
checkInitialized();
return instance.containsKey(key);
}
/** {@inheritDoc} */
public boolean containsValue(Object value) {
checkInitialized();
return instance.containsValue(value);
}
/** {@inheritDoc} */
public Set<Map.Entry<WritableComparable, Writable>> entrySet() {
checkInitialized();
return instance.entrySet();
}
/** {@inheritDoc} */
public Writable get(Object key) {
checkInitialized();
return instance.get(key);
}
/**
* Returns the value to which this map maps the specified key
* @param key
* @return value associated with specified key
*/
public Writable get(WritableComparable key) {
checkInitialized();
return instance.get(key);
}
/** {@inheritDoc} */
public boolean isEmpty() {
checkInitialized();
return instance.isEmpty();
}
/** {@inheritDoc} */
public Set<WritableComparable> keySet() {
checkInitialized();
return instance.keySet();
}
/** {@inheritDoc} */
public Writable put(WritableComparable key, Writable value) {
checkInitialized();
return instance.put(key, value);
}
/** {@inheritDoc} */
public void putAll(Map<? extends WritableComparable,? extends Writable> t) {
checkInitialized();
instance.putAll(t);
}
/** {@inheritDoc} */
public Writable remove(Object key) {
checkInitialized();
return instance.remove(key);
}
/**
* Removes the mapping for this key from this map if it is present
* @param key
* @return value corresponding to key
*/
public Writable remove(WritableComparable key) {
checkInitialized();
return instance.remove(key);
}
/** {@inheritDoc} */
public int size() {
checkInitialized();
return instance.size();
}
/** {@inheritDoc} */
public Collection<Writable> values() {
checkInitialized();
return instance.values();
}
// Writable
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
checkInitialized();
out.writeUTF(mapClass);
out.writeUTF(keyClass);
out.writeUTF(valueClass);
out.writeInt(instance.size());
for (Map.Entry<WritableComparable, Writable> e: instance.entrySet()) {
e.getKey().write(out);
e.getValue().write(out);
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
mapClass = in.readUTF();
keyClass = in.readUTF();
valueClass = in.readUTF();
instance = (Map<WritableComparable, Writable>) objectFactory(mapClass);
int entries = in.readInt();
for (int i = 0; i < entries; i++) {
WritableComparable key = (WritableComparable) objectFactory(keyClass);
key.readFields(in);
Writable value = (Writable) objectFactory(valueClass);
value.readFields(in);
instance.put(key, value);
}
}
private Object objectFactory(String className) throws IOException {
Object o = null;
String exceptionMessage = null;
try {
o = Class.forName(className).newInstance();
} catch (ClassNotFoundException e) {
exceptionMessage = e.getMessage();
} catch (InstantiationException e) {
exceptionMessage = e.getMessage();
} catch (IllegalAccessException e) {
exceptionMessage = e.getMessage();
} finally {
if (exceptionMessage != null) {
throw new IOException("error instantiating " + className + " because " +
exceptionMessage);
}
}
return o;
}
/**
* A simple main program to test this class.
*
* @param args not used
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static void main(@SuppressWarnings("unused") String[] args)
throws IOException {
HStoreKey[] keys = {
new HStoreKey(new Text("row1"), HConstants.COL_REGIONINFO),
new HStoreKey(new Text("row2"), HConstants.COL_SERVER),
new HStoreKey(new Text("row3"), HConstants.COL_STARTCODE)
};
ImmutableBytesWritable[] values = {
new ImmutableBytesWritable("value1".getBytes()),
new ImmutableBytesWritable("value2".getBytes()),
new ImmutableBytesWritable("value3".getBytes())
};
@SuppressWarnings("unchecked")
MapWritable inMap = new MapWritable(HStoreKey.class,
ImmutableBytesWritable.class,
(Map) new TreeMap<HStoreKey, ImmutableBytesWritable>());
for (int i = 0; i < keys.length; i++) {
inMap.put(keys[i], values[i]);
}
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(bytes);
try {
inMap.write(out);
} catch (IOException e) {
e.printStackTrace();
throw e;
}
MapWritable outMap = new MapWritable();
DataInput in =
new DataInputStream(new ByteArrayInputStream(bytes.toByteArray()));
try {
outMap.readFields(in);
} catch (IOException e) {
e.printStackTrace();
throw e;
}
if (outMap.size() != inMap.size()) {
System.err.println("outMap.size()=" + outMap.size() + " != " +
"inMap.size()=" + inMap.size());
}
for (Map.Entry<WritableComparable, Writable> e: inMap.entrySet()) {
if (!outMap.containsKey(e.getKey())) {
System.err.println("outMap does not contain key " + e.getKey().toString());
continue;
}
if (((WritableComparable) outMap.get(e.getKey())).compareTo(
e.getValue()) != 0) {
System.err.println("output value for " + e.getKey().toString() + " != input value");
}
}
System.out.println("it worked!");
}
}

View File

@ -21,11 +21,15 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@ -55,7 +59,8 @@ public class GroupingTableMap extends TableMap {
*
* @param table table to be processed
* @param columns space separated list of columns to fetch
* @param groupColumns space separated list of columns used to form the key used in collect
* @param groupColumns space separated list of columns used to form the key
* used in collect
* @param mapper map class
* @param job job configuration object
*/
@ -83,11 +88,11 @@ public class GroupingTableMap extends TableMap {
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(@SuppressWarnings("unused") HStoreKey key,
KeyedDataArrayWritable value, TableOutputCollector output,
MapWritable value, TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
byte[][] keyVals = extractKeyValues(value);
@ -106,20 +111,16 @@ public class GroupingTableMap extends TableMap {
* @param r
* @return array of byte values
*/
protected byte[][] extractKeyValues(KeyedDataArrayWritable r) {
protected byte[][] extractKeyValues(MapWritable r) {
byte[][] keyVals = null;
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
int numCols = m_columns.length;
if(numCols > 0) {
KeyedData[] recVals = r.get();
boolean found = true;
for(int i = 0; i < numCols && found; i++) {
found = false;
for(int j = 0; j < recVals.length; j++) {
if(recVals[j].getKey().getColumn().equals(m_columns[i])) {
found = true;
byte[] val = recVals[j].getData();
foundList.add(val);
for (Map.Entry<WritableComparable, Writable> e: r.entrySet()) {
Text column = (Text) e.getKey();
for (int i = 0; i < numCols; i++) {
if (column.equals(m_columns[i])) {
foundList.add(((ImmutableBytesWritable) e.getValue()).get());
break;
}
}

View File

@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Reporter;
@ -40,10 +40,10 @@ public class IdentityTableMap extends TableMap {
/**
* Pass the key, value to reduce
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(HStoreKey key, KeyedDataArrayWritable value,
public void map(HStoreKey key, MapWritable value,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {

View File

@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Reporter;
@ -48,7 +48,7 @@ public class IdentityTableReduce extends TableReduce {
@SuppressWarnings("unused") Reporter reporter) throws IOException {
while(values.hasNext()) {
KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next();
MapWritable r = (MapWritable)values.next();
output.collect(key, r);
}
}

View File

@ -21,13 +21,11 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.ArrayList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
@ -40,8 +38,8 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HScannerInterface;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.log4j.Logger;
@ -49,7 +47,7 @@ import org.apache.log4j.Logger;
* Convert HBase tabular data into a format that is consumable by Map/Reduce
*/
public class TableInputFormat
implements InputFormat<HStoreKey, KeyedDataArrayWritable>, JobConfigurable {
implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
@ -64,11 +62,12 @@ public class TableInputFormat
HTable m_table;
/**
* Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
* Iterate over an HBase table data,
* return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs
*/
class TableRecordReader implements RecordReader<HStoreKey, KeyedDataArrayWritable> {
class TableRecordReader implements RecordReader<HStoreKey, MapWritable> {
private HScannerInterface m_scanner;
private TreeMap<Text, byte[]> m_row; // current buffer
private SortedMap<Text, byte[]> m_row; // current buffer
private Text m_endRow;
/**
@ -102,12 +101,15 @@ public class TableInputFormat
}
/**
* @return KeyedDataArrayWritable of KeyedData
* @return MapWritable
*
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
public KeyedDataArrayWritable createValue() {
return new KeyedDataArrayWritable();
@SuppressWarnings("unchecked")
public MapWritable createValue() {
return new MapWritable((Class) Text.class,
(Class) ImmutableBytesWritable.class,
(Map) new TreeMap<Text, ImmutableBytesWritable>());
}
/** {@inheritDoc} */
@ -125,34 +127,31 @@ public class TableInputFormat
/**
* @param key HStoreKey as input key.
* @param value KeyedDataArrayWritable as input value
* @param value MapWritable as input value
*
* Converts HScannerInterface.next(HStoreKey, SortedMap<Text, byte[]>) to
* HStoreKey, MapWritable<Text, ImmutableBytesWritable>
*
* Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to
* (HStoreKey, KeyedDataArrayWritable)
* @return true if there was more data
* @throws IOException
*/
public boolean next(HStoreKey key, KeyedDataArrayWritable value) throws IOException {
@SuppressWarnings("unchecked")
public boolean next(HStoreKey key, MapWritable value) throws IOException {
LOG.debug("start next");
m_row.clear();
HStoreKey tKey = key;
boolean hasMore = m_scanner.next(tKey, m_row);
if(hasMore) {
if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
if(m_endRow.getLength() > 0 &&
(tKey.getRow().compareTo(m_endRow) < 0)) {
hasMore = false;
} else {
KeyedDataArrayWritable rowVal = value;
ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
HStoreKey keyCol = new HStoreKey(tKey);
keyCol.setColumn(e.getKey());
columns.add(new KeyedData(keyCol, e.getValue()));
value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
}
// set the output
rowVal.set(columns.toArray(new KeyedData[columns.size()]));
}
}
LOG.debug("end next");
@ -161,7 +160,8 @@ public class TableInputFormat
}
public RecordReader<HStoreKey, KeyedDataArrayWritable> getRecordReader(
/** {@inheritDoc} */
public RecordReader<HStoreKey, MapWritable> getRecordReader(
InputSplit split,
@SuppressWarnings("unused") JobConf job,
@SuppressWarnings("unused") Reporter reporter) throws IOException {

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.Text;
@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
* If the column does not exist, the record is not passed to Reduce.
*
*/
@SuppressWarnings("unchecked")
public abstract class TableMap extends MapReduceBase implements Mapper {
private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
@ -64,7 +65,7 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
job.setInputFormat(TableInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KeyedDataArrayWritable.class);
job.setOutputValueClass(MapWritable.class);
job.setMapperClass(mapper);
job.setInputPath(new Path(table));
job.set(TableInputFormat.COLUMN_LIST, columns);
@ -95,7 +96,7 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
if(m_collector.collector == null) {
m_collector.collector = output;
}
map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter);
map((HStoreKey)key, (MapWritable)value, m_collector, reporter);
LOG.debug("end map");
}
@ -109,6 +110,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
* @param reporter
* @throws IOException
*/
public abstract void map(HStoreKey key, KeyedDataArrayWritable value,
public abstract void map(HStoreKey key, MapWritable value,
TableOutputCollector output, Reporter reporter) throws IOException;
}

View File

@ -24,13 +24,14 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.MapWritable;
/**
* Refine the types that can be collected from a Table Map/Reduce jobs.
*/
public class TableOutputCollector {
/** The collector object */
@SuppressWarnings("unchecked")
public OutputCollector collector;
/**
@ -40,8 +41,8 @@ public class TableOutputCollector {
* @param value
* @throws IOException
*/
public void collect(Text key, KeyedDataArrayWritable value)
throws IOException {
@SuppressWarnings("unchecked")
public void collect(Text key, MapWritable value) throws IOException {
collector.collect(key, value);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
@ -34,8 +35,8 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.log4j.Logger;
@ -43,7 +44,7 @@ import org.apache.log4j.Logger;
* Convert Map/Reduce output and write it to an HBase table
*/
public class TableOutputFormat
extends OutputFormatBase<Text, KeyedDataArrayWritable> {
extends OutputFormatBase<Text, MapWritable> {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
@ -58,8 +59,7 @@ public class TableOutputFormat
* and write to an HBase table
*/
protected class TableRecordWriter
implements RecordWriter<Text, KeyedDataArrayWritable> {
implements RecordWriter<Text, MapWritable> {
private HTable m_table;
/**
@ -74,25 +74,17 @@ public class TableOutputFormat
/** {@inheritDoc} */
public void close(@SuppressWarnings("unused") Reporter reporter) {}
/**
* Expect key to be of type Text
* Expect value to be of type KeyedDataArrayWritable
*
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public void write(Text key, KeyedDataArrayWritable value) throws IOException {
/** {@inheritDoc} */
public void write(Text key, MapWritable value) throws IOException {
LOG.debug("start write");
Text tKey = key;
KeyedDataArrayWritable tValue = value;
KeyedData[] columns = tValue.get();
// start transaction
long xid = m_table.startUpdate(tKey);
for(int i = 0; i < columns.length; i++) {
KeyedData column = columns[i];
m_table.put(xid, column.getKey().getColumn(), column.getData());
long xid = m_table.startUpdate(key);
for (Map.Entry<WritableComparable, Writable> e: value.entrySet()) {
m_table.put(xid, (Text)e.getKey(),
((ImmutableBytesWritable)e.getValue()).get());
}
// end transaction
@ -103,14 +95,14 @@ public class TableOutputFormat
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable)
*/
/** {@inheritDoc} */
@Override
@SuppressWarnings("unused")
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
@SuppressWarnings("unchecked")
public RecordWriter getRecordWriter(
@SuppressWarnings("unused") FileSystem ignored,
JobConf job,
@SuppressWarnings("unused") String name,
@SuppressWarnings("unused") Progressable progress) throws IOException {
// expecting exactly one path
@ -119,8 +111,9 @@ public class TableOutputFormat
HTable table = null;
try {
table = new HTable(job, tableName);
} catch(Exception e) {
} catch(IOException e) {
LOG.error(e);
throw e;
}
LOG.debug("end get writer");
return new TableRecordWriter(table);

View File

@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
/**
* Write a table, sorting by the input key
*/
@SuppressWarnings("unchecked")
public abstract class TableReduce extends MapReduceBase implements Reducer {
private static final Logger LOG =
Logger.getLogger(TableReduce.class.getName());

View File

@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
* Abstract base class for test cases. Performs all static initialization
*/
public abstract class HBaseTestCase extends TestCase {
public final static String COLFAMILY_NAME1 = "colfamily1:";
public final static String COLFAMILY_NAME2 = "colfamily2:";
public final static String COLFAMILY_NAME3 = "colfamily3:";
protected final static String COLFAMILY_NAME1 = "colfamily1:";
protected final static String COLFAMILY_NAME2 = "colfamily2:";
protected final static String COLFAMILY_NAME3 = "colfamily3:";
protected Path testDir = null;
protected FileSystem localFs = null;
public static final char FIRST_CHAR = 'a';
public static final char LAST_CHAR = 'z';
protected static final char FIRST_CHAR = 'a';
protected static final char LAST_CHAR = 'z';
static {
StaticTestEnvironment.initialize();

View File

@ -149,6 +149,7 @@ public class PerformanceEvaluation implements HConstants {
/**
* MapReduce job that runs a performance evaluation client in each map task.
*/
@SuppressWarnings("unchecked")
public static class EvaluationMapTask extends MapReduceBase
implements Mapper {
/** configuration parameter name that contains the command */

View File

@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory;
public class TestCompaction extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}

View File

@ -32,12 +32,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.hbase.filter.RegExpRowFilter;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.RowFilterSet;
import org.apache.hadoop.hbase.filter.StopRowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
@ -211,13 +214,15 @@ public class TestScanner2 extends HBaseClusterTestCase {
System.currentTimeMillis(), null);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
KeyedData[] values = regionServer.next(scannerId);
if (values.length == 0) {
MapWritable values = regionServer.next(scannerId);
if (values == null || values.size() == 0) {
break;
}
for (int i = 0; i < values.length; i++) {
results.put(values[i].getKey().getColumn(), values[i].getData());
for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
HStoreKey k = (HStoreKey) e.getKey();
results.put(k.getColumn(),
((ImmutableBytesWritable) e.getValue()).get());
}
HRegionInfo info = (HRegionInfo) Writables.getWritable(

View File

@ -38,8 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.hbase.io.KeyedData;
import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableOutputCollector;
@ -150,44 +150,44 @@ public class TestTableMapReduce extends HBaseTestCase {
/**
* Pass the key, and reversed value to reduce
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@SuppressWarnings("unchecked")
@Override
public void map(HStoreKey key, KeyedDataArrayWritable value,
public void map(HStoreKey key, MapWritable value,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Text tKey = key.getRow();
KeyedData[] columns = value.get();
if(columns.length != 1) {
if(value.size() != 1) {
throw new IOException("There should only be one input column");
}
if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) {
Text[] keys = value.keySet().toArray(new Text[value.size()]);
if(!keys[0].equals(TEXT_INPUT_COLUMN)) {
throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+ " but got: " + columns[0].getKey().getColumn());
+ " but got: " + keys[0]);
}
// Get the input column key and change it to the output column key
HStoreKey column = columns[0].getKey();
column.setColumn(TEXT_OUTPUT_COLUMN);
// Get the original value and reverse it
String originalValue = new String(columns[0].getData());
String originalValue =
new String(((ImmutableBytesWritable)value.get(keys[0])).get());
StringBuilder newValue = new StringBuilder();
for(int i = originalValue.length() - 1; i >= 0; i--) {
newValue.append(originalValue.charAt(i));
}
// Now set the value to be collected
MapWritable outval = new MapWritable((Class) Text.class,
(Class) ImmutableBytesWritable.class,
(Map) new TreeMap<Text, ImmutableBytesWritable>());
outval.put(TEXT_OUTPUT_COLUMN,
new ImmutableBytesWritable(newValue.toString().getBytes()));
columns[0] = new KeyedData(column, newValue.toString().getBytes());
value.set(columns);
output.collect(tKey, value);
output.collect(tKey, outval);
}
}