HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations.

Summary: Send the list of mutations to AsyncProcess only after done adding the list otherwise there's a lot of contention

Test Plan: UnitTests.

Differential Revision: https://reviews.facebook.net/D49251
This commit is contained in:
Elliott Clark 2015-10-22 20:24:48 -07:00
parent f9efeaad1d
commit caae3b2e5f
2 changed files with 26 additions and 37 deletions

View File

@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -98,48 +99,35 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override @Override
public synchronized void mutate(Mutation m) throws InterruptedIOException, public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException { RetriesExhaustedWithDetailsException {
doMutate(m); mutate(Arrays.asList(m));
} }
@Override @Override
public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException, public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException { RetriesExhaustedWithDetailsException {
for (Mutation m : ms) {
doMutate(m);
}
}
/**
* Add the put to the buffer. If the buffer is already too large, sends the buffer to the
* cluster.
*
* @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
* @throws InterruptedIOException if we were interrupted.
*/
private void doMutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) { if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
} }
if (!(m instanceof Put) && !(m instanceof Delete)) {
throw new IllegalArgumentException("Pass a Delete or a Put"); for (Mutation m : ms) {
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
} }
// This behavior is highly non-intuitive... it does not protect us against // This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code // 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed. // and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) { if (ap.hasError()) {
writeAsyncBuffer.add(m); writeAsyncBuffer.addAll(ms);
backgroundFlushCommits(true); backgroundFlushCommits(true);
} else {
writeAsyncBuffer.addAll(ms);
} }
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
// Now try and queue what needs to be queued.
while (currentWriteBufferSize > writeBufferSize) { while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false); backgroundFlushCommits(false);
} }

View File

@ -111,7 +111,7 @@ public class TestSnapshotFromMaster {
conf.setInt("hbase.hregion.memstore.flush.size", 25000); conf.setInt("hbase.hregion.memstore.flush.size", 25000);
// so make sure we get a compaction when doing a load, but keep around some // so make sure we get a compaction when doing a load, but keep around some
// files in the store // files in the store
conf.setInt("hbase.hstore.compaction.min", 3); conf.setInt("hbase.hstore.compaction.min", 2);
conf.setInt("hbase.hstore.compactionThreshold", 5); conf.setInt("hbase.hstore.compactionThreshold", 5);
// block writes if we get to 12 store files // block writes if we get to 12 store files
conf.setInt("hbase.hstore.blockingStoreFiles", 12); conf.setInt("hbase.hstore.blockingStoreFiles", 12);
@ -288,11 +288,12 @@ public class TestSnapshotFromMaster {
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
htd.setCompactionEnabled(false); htd.setCompactionEnabled(false);
UTIL.createTable(htd, new byte[][] { TEST_FAM }, null); UTIL.createTable(htd, new byte[][] { TEST_FAM }, null);
// load the table (creates 4 hfiles)
// load the table (creates at least 4 hfiles)
for ( int i = 0; i < 5; i++) {
UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM); UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
UTIL.flush(TABLE_NAME); UTIL.flush(TABLE_NAME);
// Put some more data into the table so for sure we get more storefiles. }
UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
// disable the table so we can take a snapshot // disable the table so we can take a snapshot
admin.disableTable(TABLE_NAME); admin.disableTable(TABLE_NAME);
@ -319,7 +320,7 @@ public class TestSnapshotFromMaster {
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); List<HRegion> regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
for (HRegion region : regions) { for (HRegion region : regions) {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it. region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 3 so will compact and archive region.compactStores(); // min is 2 so will compact and archive
} }
LOG.info("After compaction File-System state"); LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG); FSUtils.logFileSystemState(fs, rootDir, LOG);