From bf176b63f3ec725ec73f995b6051a689a5de8a9b Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 30 Nov 2007 00:01:25 +0000 Subject: [PATCH] HADOOP-2234 TableInputFormat erroneously aggregates map values git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@599643 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/mapred/TableInputFormat.java | 52 +++++++------------ 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 476a99522c1..92f0981f20a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -40,6 +40,7 @@ Trunk (unreleased changes) HADOOP-2253 getRow can return HBASE::DELETEVAL cells (Bryan Duxbury via Stack) HADOOP-2295 Fix assigning a region to multiple servers + HADOOP-2234 TableInputFormat erroneously aggregates map values IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java index 24d016b9a8f..1fe59f89074 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -48,8 +48,7 @@ import org.apache.log4j.Logger; * Convert HBase tabular data into a format that is consumable by Map/Reduce */ public class TableInputFormat -implements InputFormat, JobConfigurable { - +implements InputFormat, JobConfigurable { static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName()); /** @@ -67,9 +66,9 @@ implements InputFormat, JobConfigurable { * return (HStoreKey, MapWritable) pairs */ class TableRecordReader implements RecordReader { - private HScannerInterface m_scanner; - private SortedMap m_row; // current buffer - private Text m_endRow; + private final HScannerInterface m_scanner; + // current buffer + private final SortedMap m_row = new TreeMap(); /** * Constructor @@ -78,14 +77,15 @@ implements InputFormat, JobConfigurable { * @throws IOException */ public TableRecordReader(Text startRow, Text endRow) throws IOException { - m_row = new TreeMap(); - m_scanner = m_table.obtainScanner(m_cols, startRow); - m_endRow = endRow; + if (endRow != null && endRow.getLength() > 0) { + this.m_scanner = m_table.obtainScanner(m_cols, startRow, endRow); + } else { + this.m_scanner = m_table.obtainScanner(m_cols, startRow); + } } - /** {@inheritDoc} */ public void close() throws IOException { - m_scanner.close(); + this.m_scanner.close(); } /** @@ -132,20 +132,14 @@ implements InputFormat, JobConfigurable { */ @SuppressWarnings("unchecked") public boolean next(HStoreKey key, MapWritable value) throws IOException { - m_row.clear(); + this.m_row.clear(); HStoreKey tKey = key; - boolean hasMore = m_scanner.next(tKey, m_row); - - if(hasMore) { - if(m_endRow.getLength() > 0 && - (tKey.getRow().compareTo(m_endRow) >= 0)) { - - hasMore = false; - - } else { - for(Map.Entry e: m_row.entrySet()) { - value.put(e.getKey(), new ImmutableBytesWritable(e.getValue())); - } + boolean hasMore = this.m_scanner.next(tKey, this.m_row); + if (hasMore) { + // Clear value to remove content added by previous call to next. + value.clear(); + for (Map.Entry e: this.m_row.entrySet()) { + value.put(e.getKey(), new ImmutableBytesWritable(e.getValue())); } } return hasMore; @@ -153,12 +147,11 @@ implements InputFormat, JobConfigurable { } - /** {@inheritDoc} */ public RecordReader getRecordReader( InputSplit split, @SuppressWarnings("unused") JobConf job, - @SuppressWarnings("unused") Reporter reporter) throws IOException { - + @SuppressWarnings("unused") Reporter reporter) + throws IOException { TableSplit tSplit = (TableSplit)split; return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow()); } @@ -185,7 +178,6 @@ implements InputFormat, JobConfigurable { return splits; } - /** {@inheritDoc} */ public void configure(JobConf job) { Path[] tableNames = job.getInputPaths(); m_tableName = new Text(tableNames[0].getName()); @@ -202,21 +194,17 @@ implements InputFormat, JobConfigurable { } } - /** {@inheritDoc} */ public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - Path[] tableNames = job.getInputPaths(); if(tableNames == null || tableNames.length > 1) { throw new IOException("expecting one table name"); } // expecting at least one column - String colArg = job.get(COLUMN_LIST); if(colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } } -} +} \ No newline at end of file