diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 0b222b106c2..5341d479894 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -98,48 +99,35 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public synchronized void mutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doMutate(m); + mutate(Arrays.asList(m)); } @Override public synchronized void mutate(List ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Mutation m : ms) { - doMutate(m); - } - } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doMutate(Mutation m) throws InterruptedIOException, - RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } - if (!(m instanceof Put) && !(m instanceof Delete)) { - throw new IllegalArgumentException("Pass a Delete or a Put"); + + for (Mutation m : ms) { + if (m instanceof Put) { + validatePut((Put) m); + } + currentWriteBufferSize += m.heapSize(); } - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(m); - backgroundFlushCommits(true); - } + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.addAll(ms); + backgroundFlushCommits(true); + } else { + writeAsyncBuffer.addAll(ms); + } - if (m instanceof Put) { - validatePut((Put) m); - } - - currentWriteBufferSize += m.heapSize(); - writeAsyncBuffer.add(m); + // Now try and queue what needs to be queued. while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index e00134a7726..687b6ae21cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -111,7 +111,7 @@ public class TestSnapshotFromMaster { conf.setInt("hbase.hregion.memstore.flush.size", 25000); // so make sure we get a compaction when doing a load, but keep around some // files in the store - conf.setInt("hbase.hstore.compaction.min", 3); + conf.setInt("hbase.hstore.compaction.min", 2); conf.setInt("hbase.hstore.compactionThreshold", 5); // block writes if we get to 12 store files conf.setInt("hbase.hstore.blockingStoreFiles", 12); @@ -288,11 +288,12 @@ public class TestSnapshotFromMaster { HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); htd.setCompactionEnabled(false); UTIL.createTable(htd, new byte[][] { TEST_FAM }, null); - // load the table (creates 4 hfiles) - UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM); - UTIL.flush(TABLE_NAME); - // Put some more data into the table so for sure we get more storefiles. - UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM); + + // load the table (creates at least 4 hfiles) + for ( int i = 0; i < 5; i++) { + UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM); + UTIL.flush(TABLE_NAME); + } // disable the table so we can take a snapshot admin.disableTable(TABLE_NAME); @@ -319,7 +320,7 @@ public class TestSnapshotFromMaster { List regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); for (HRegion region : regions) { region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it. - region.compactStores(); // min is 3 so will compact and archive + region.compactStores(); // min is 2 so will compact and archive } LOG.info("After compaction File-System state"); FSUtils.logFileSystemState(fs, rootDir, LOG);