HBASE-18962 Support atomic (all or none) BatchOperations through batchMutate()

Signed-off-by: Apekshit Sharma <appy@apache.org>
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Umesh Agashe 2017-10-19 11:05:01 -07:00 committed by Michael Stack
parent 0ec96a5ffe
commit 5bb2a24984
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
3 changed files with 166 additions and 70 deletions

View File

@ -2952,6 +2952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected final ObservedExceptionsInBatch observedExceptions;
//Durability of the batch (highest durability of all operations)
protected Durability durability;
protected boolean atomic = false;
public BatchOperation(final HRegion region, T[] operations) {
this.operations = operations;
@ -3067,6 +3068,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return getMutation(0).getClusterIds();
}
boolean isAtomic() {
return atomic;
}
/**
* Helper method that checks and prepares only one mutation. This can be used to implement
* {@link #checkAndPrepare()} for entire Batch.
@ -3097,16 +3102,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
} catch (NoSuchColumnFamilyException nscf) {
} catch (NoSuchColumnFamilyException nscfe) {
final String msg = "No such column family in batch mutation. ";
if (observedExceptions.hasSeenNoSuchFamily()) {
LOG.warn(msg + nscf.getMessage());
LOG.warn(msg + nscfe.getMessage());
} else {
LOG.warn(msg, nscf);
LOG.warn(msg, nscfe);
observedExceptions.sawNoSuchFamily();
}
retCodeDetails[index] = new OperationStatus(
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
OperationStatusCode.BAD_FAMILY, nscfe.getMessage());
if (isAtomic()) { // fail, atomic means all or none
throw nscfe;
}
} catch (FailedSanityCheckException fsce) {
final String msg = "Batch Mutation did not pass sanity check. ";
if (observedExceptions.hasSeenFailedSanityCheck()) {
@ -3117,6 +3125,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
retCodeDetails[index] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
if (isAtomic()) {
throw fsce;
}
} catch (WrongRegionException we) {
final String msg = "Batch mutation had a row that does not belong to this region. ";
if (observedExceptions.hasSeenWrongRegion()) {
@ -3127,6 +3138,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
retCodeDetails[index] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
if (isAtomic()) {
throw we;
}
}
}
@ -3150,15 +3164,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we haven't got any rows in our batch, we should block to get the next one.
RowLock rowLock = null;
try {
rowLock = region.getRowLockInternal(mutation.getRow(), true);
// if atomic then get exclusive lock, else shared lock
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());
} catch (TimeoutIOException e) {
// We will retry when other exceptions, but we should stop if we timeout .
throw e;
} catch (IOException ioe) {
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
if (isAtomic()) { // fail, atomic means all or none
throw ioe;
}
}
if (rowLock == null) {
// We failed to grab another lock
if (isAtomic()) {
throw new IOException("Can't apply all operations atomically!");
}
break; // Stop acquiring more rows for this batch
} else {
acquiredRowLocks.add(rowLock);
@ -3279,12 +3300,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
* of the logic is same.
*/
private static class MutationBatchOperation extends BatchOperation<Mutation> {
static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup;
private long nonce;
public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,
long nonce) {
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
super(region, operations);
this.atomic = atomic;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
@ -3522,11 +3544,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
retCodeDetails[index] = OperationStatus.SUCCESS;
}
} else {
String msg = "Put/Delete mutations only supported in a batch";
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
// mark the operation return code as failure so that it will not be considered in
// the doMiniBatchMutation
retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE,
"Put/Delete mutations only supported in batchMutate() now");
retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg);
if (isAtomic()) { // fail, atomic means all or none
throw new IOException(msg);
}
}
}
@ -3582,7 +3608,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
* of the logic is same.
*/
private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long origLogSeqNum = 0;
public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
long origLogSeqNum) {
@ -3695,11 +3721,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
throws IOException {
return batchMutate(mutations, false, nonceGroup, nonce);
}
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
long nonce) throws IOException {
// As it stands, this is used for 3 things
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce));
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
}
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -604,9 +604,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Mutate a list of rows atomically.
*
* @param cellScanner if non-null, the mutation data -- the Cell content.
* @param comparator @throws IOException
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
@ -757,10 +755,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
* @param region
* @param actions
* @param cellScanner
* @param builder
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
* method returns as a 'result'.
* @param closeCallBack the callback to be used with multigets
@ -864,7 +858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
mutations.clear();
}
switch (type) {
@ -925,7 +919,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// Finish up any outstanding mutations
if (mutations != null && !mutations.isEmpty()) {
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
try {
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
} catch (IOException ioe) {
rpcServer.getMetrics().exception(ioe);
NameBytesPair pair = ResponseConverter.buildException(ioe);
resultOrExceptionBuilder.setException(pair);
context.incrementResponseExceptionSize(pair.getSerializedSize());
builder.addResultOrException(resultOrExceptionBuilder.build());
}
}
return cellsToReturn;
}
@ -955,7 +957,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
throws IOException {
Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
@ -967,7 +970,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* is the mutation belong to. We can't sort ClientProtos.Action array, since they
* are bonded to cellscanners.
*/
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<Mutation, ClientProtos.Action>();
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
int i = 0;
for (ClientProtos.Action action: mutations) {
MutationProto m = action.getMutation();
@ -995,7 +998,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// sort to improve lock efficiency
Arrays.sort(mArray);
OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
HConstants.NO_NONCE);
for (i = 0; i < codes.length; i++) {
Mutation currentMutation = mArray[i];
@ -1025,6 +1028,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
} catch (IOException ie) {
if (atomic) {
throw ie;
}
for (int i = 0; i < mutations.size(); i++) {
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
}
@ -1130,7 +1136,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// Exposed for testing
static interface LogDelegate {
interface LogDelegate {
void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
}
@ -3229,7 +3235,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
};
}
try {
checkScanNextCallSeq(request, rsh);
} catch (OutOfOrderScannerNextException e) {

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
@ -165,6 +166,7 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.mockito.ArgumentCaptor;
@ -200,6 +202,7 @@ public class TestHRegion {
@ClassRule
public static final TestRule timeout =
CategoryBasedTimeout.forClass(TestHRegion.class);
@Rule public final ExpectedException thrown = ExpectedException.none();
private static final String COLUMN_FAMILY = "MyCF";
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
@ -215,9 +218,11 @@ public class TestHRegion {
// Test names
protected TableName tableName;
protected String method;
protected final byte[] qual = Bytes.toBytes("qual");
protected final byte[] qual1 = Bytes.toBytes("qual1");
protected final byte[] qual2 = Bytes.toBytes("qual2");
protected final byte[] qual3 = Bytes.toBytes("qual3");
protected final byte[] value = Bytes.toBytes("value");
protected final byte[] value1 = Bytes.toBytes("value1");
protected final byte[] value2 = Bytes.toBytes("value2");
protected final byte[] row = Bytes.toBytes("rowA");
@ -1522,21 +1527,10 @@ public class TestHRegion {
@Test
public void testBatchPut_whileNoRowLocksHeld() throws IOException {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
this.region = initHRegion(tableName, method, CONF, cf);
final Put[] puts = new Put[10];
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
LOG.info("First a batch put with all valid puts");
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
puts[i] = new Put(Bytes.toBytes("row_" + i));
puts[i].addColumn(cf, qual, val);
}
long syncs = prepareRegionForBachPut(puts, source, false);
OperationStatus[] codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
@ -1546,7 +1540,7 @@ public class TestHRegion {
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
LOG.info("Next a batch put with one invalid family");
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
@ -1563,21 +1557,12 @@ public class TestHRegion {
@Test
public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
this.region = initHRegion(tableName, method, CONF, cf);
final Put[] puts = new Put[10];
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
long syncs = prepareRegionForBachPut(puts, source, false);
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
puts[i] = new Put(Bytes.toBytes("row_" + i));
puts[i].addColumn(cf, qual, val);
}
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
LOG.info("batchPut will have to break into four batches to avoid row locks");
RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
@ -1585,7 +1570,6 @@ public class TestHRegion {
RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
final CountDownLatch startingPuts = new CountDownLatch(1);
@ -1658,31 +1642,89 @@ public class TestHRegion {
Thread.sleep(100);
if (System.currentTimeMillis() - startWait > 10000) {
fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
expectedCount, currentCount));
expectedCount, currentCount));
}
}
}
@Test
public void testBatchPutWithTsSlop() throws Exception {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
public void testAtomicBatchPut() throws IOException {
final Put[] puts = new Put[10];
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = prepareRegionForBachPut(puts, source, false);
// 1. Straight forward case, should succeed
MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
HConstants.NO_NONCE, HConstants.NO_NONCE);
OperationStatus[] codes = this.region.batchMutate(batchOp);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
}
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
// 2. Failed to get lock
RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
// Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
// thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference<IOException> retFromThread = new AtomicReference<>();
final CountDownLatch finishedPuts = new CountDownLatch(1);
final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
HConstants
.NO_NONCE,
HConstants.NO_NONCE);
TestThread putter = new TestThread(ctx) {
@Override
public void doWork() throws IOException {
try {
region.batchMutate(finalBatchOp);
} catch (IOException ioe) {
LOG.error("test failed!", ioe);
retFromThread.set(ioe);
}
finishedPuts.countDown();
}
};
LOG.info("...starting put thread while holding locks");
ctx.addThread(putter);
ctx.startThreads();
LOG.info("...waiting for batch puts while holding locks");
try {
finishedPuts.await();
} catch (InterruptedException e) {
LOG.error("Interrupted!", e);
} finally {
if (lock != null) {
lock.release();
}
}
assertNotNull(retFromThread.get());
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
// 3. Exception thrown in validation
LOG.info("Next a batch put with one invalid family");
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
HConstants.NO_NONCE);
thrown.expect(NoSuchColumnFamilyException.class);
this.region.batchMutate(batchOp);
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
}
}
@Test
public void testBatchPutWithTsSlop() throws Exception {
// add data with a timestamp that is too recent for range. Ensure assert
CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
this.region = initHRegion(tableName, method, CONF, cf);
final Put[] puts = new Put[10];
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
puts[i].addColumn(cf, qual, val);
}
long syncs = prepareRegionForBachPut(puts, source, true);
OperationStatus[] codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
@ -1690,12 +1732,29 @@ public class TestHRegion {
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
}
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
}
}
/**
* @return syncs initial syncTimeNumOps
*/
private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
boolean slop) throws IOException {
this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
LOG.info("First a batch put with all valid puts");
for (int i = 0; i < puts.length; i++) {
puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
new Put(Bytes.toBytes("row_" + i));
puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
}
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
return syncs;
}
// ////////////////////////////////////////////////////////////////////////////