HADOOP-2234 TableInputFormat erroneously aggregates map values

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@599643 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-11-30 00:01:25 +00:00
parent b8291d673e
commit bf176b63f3
2 changed files with 21 additions and 32 deletions

View File

@ -40,6 +40,7 @@ Trunk (unreleased changes)
HADOOP-2253 getRow can return HBASE::DELETEVAL cells
(Bryan Duxbury via Stack)
HADOOP-2295 Fix assigning a region to multiple servers
HADOOP-2234 TableInputFormat erroneously aggregates map values
IMPROVEMENTS
HADOOP-2401 Add convenience put method that takes writable

View File

@ -48,8 +48,7 @@ import org.apache.log4j.Logger;
* Convert HBase tabular data into a format that is consumable by Map/Reduce
*/
public class TableInputFormat
implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
/**
@ -67,9 +66,9 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
* return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs
*/
class TableRecordReader implements RecordReader<HStoreKey, MapWritable> {
private HScannerInterface m_scanner;
private SortedMap<Text, byte[]> m_row; // current buffer
private Text m_endRow;
private final HScannerInterface m_scanner;
// current buffer
private final SortedMap<Text, byte[]> m_row = new TreeMap<Text, byte[]>();
/**
* Constructor
@ -78,14 +77,15 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
* @throws IOException
*/
public TableRecordReader(Text startRow, Text endRow) throws IOException {
m_row = new TreeMap<Text, byte[]>();
m_scanner = m_table.obtainScanner(m_cols, startRow);
m_endRow = endRow;
if (endRow != null && endRow.getLength() > 0) {
this.m_scanner = m_table.obtainScanner(m_cols, startRow, endRow);
} else {
this.m_scanner = m_table.obtainScanner(m_cols, startRow);
}
}
/** {@inheritDoc} */
public void close() throws IOException {
m_scanner.close();
this.m_scanner.close();
}
/**
@ -132,20 +132,14 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
*/
@SuppressWarnings("unchecked")
public boolean next(HStoreKey key, MapWritable value) throws IOException {
m_row.clear();
this.m_row.clear();
HStoreKey tKey = key;
boolean hasMore = m_scanner.next(tKey, m_row);
if(hasMore) {
if(m_endRow.getLength() > 0 &&
(tKey.getRow().compareTo(m_endRow) >= 0)) {
hasMore = false;
} else {
for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
}
boolean hasMore = this.m_scanner.next(tKey, this.m_row);
if (hasMore) {
// Clear value to remove content added by previous call to next.
value.clear();
for (Map.Entry<Text, byte[]> e: this.m_row.entrySet()) {
value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
}
}
return hasMore;
@ -153,12 +147,11 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
}
/** {@inheritDoc} */
public RecordReader<HStoreKey, MapWritable> getRecordReader(
InputSplit split,
@SuppressWarnings("unused") JobConf job,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
@SuppressWarnings("unused") Reporter reporter)
throws IOException {
TableSplit tSplit = (TableSplit)split;
return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
}
@ -185,7 +178,6 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
return splits;
}
/** {@inheritDoc} */
public void configure(JobConf job) {
Path[] tableNames = job.getInputPaths();
m_tableName = new Text(tableNames[0].getName());
@ -202,21 +194,17 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
}
}
/** {@inheritDoc} */
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path[] tableNames = job.getInputPaths();
if(tableNames == null || tableNames.length > 1) {
throw new IOException("expecting one table name");
}
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if(colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
}
}
}