From ce3e6ccdff56430cedf05c250c3c02117df9b3b4 Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Fri, 24 Oct 2008 23:24:09 +0000 Subject: [PATCH] HBASE-940 Make the TableOutputFormat batching-aware git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@707784 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/io/BatchUpdate.java | 11 +++++++++++ .../hadoop/hbase/mapred/IdentityTableReduce.java | 3 ++- .../apache/hadoop/hbase/mapred/TableOutputFormat.java | 10 ++++++++-- .../hadoop/hbase/mapred/TestTableMapReduce.java | 2 +- 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c1ccd3f4433..5312124bc27 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -65,6 +65,7 @@ Release 0.19.0 - Unreleased SubString operator (Clint Morgan via Stack) HBASE-937 Thrift getRow does not support specifying columns (Doğacan Güney via Stack) + HBASE-940 Make the TableOutputFormat batching-aware NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java index 1c33ce750f6..998760752d9 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java @@ -89,6 +89,17 @@ public class BatchUpdate implements WritableComparable, public BatchUpdate(final String row, long timestamp){ this(Bytes.toBytes(row), timestamp); } + + /** + * Recopy constructor + * @param buToCopy BatchUpdate to copy + */ + public BatchUpdate(BatchUpdate buToCopy) { + this(buToCopy.getRow(), buToCopy.getTimestamp()); + for(BatchOperation bo : buToCopy) { + this.put(bo.getColumn(), bo.getValue()); + } + } /** * Initialize a BatchUpdate operation on a row with a specific timestamp. diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java index 3a3401c3f30..8a1d4ab0e2b 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java @@ -24,7 +24,8 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 0d74bfed84b..0aa52c428a1 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -64,12 +64,17 @@ FileOutputFormat { } public void close(@SuppressWarnings("unused") Reporter reporter) { - // Nothing to do. + try { + m_table.flushCommits(); + } + catch(IOException ioe) { + LOG.error(ioe); + } } public void write(@SuppressWarnings("unused") ImmutableBytesWritable key, BatchUpdate value) throws IOException { - m_table.commit(value); + m_table.commit(new BatchUpdate(value)); } } @@ -91,6 +96,7 @@ FileOutputFormat { LOG.error(e); throw e; } + table.setAutoFlush(false); return new TableRecordWriter(table); } diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java index de8b4db70d1..375818d96cf 100644 --- a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -135,7 +135,7 @@ public class TestTableMapReduce extends MultiRegionTable { TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()), IdentityTableReduce.class, jobConf); - LOG.info("Started " + table.getTableName()); + LOG.info("Started " + Bytes.toString(table.getTableName())); JobClient.runJob(jobConf); LOG.info("After map/reduce completion");