From 85227d6a726ab8237ed63c9f4e56fc5ad033d7f6 Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Thu, 19 Oct 2017 11:05:01 -0700 Subject: [PATCH] HBASE-18962 Support atomic (all or none) BatchOperations through batchMutate() Signed-off-by: Apekshit Sharma Signed-off-by: Michael Stack --- .../hadoop/hbase/regionserver/HRegion.java | 55 +++++-- .../hbase/regionserver/RSRpcServices.java | 34 ++-- .../hbase/regionserver/TestHRegion.java | 147 ++++++++++++------ 3 files changed, 166 insertions(+), 70 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f6890af44f7..492325e79ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2952,6 +2952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final ObservedExceptionsInBatch observedExceptions; //Durability of the batch (highest durability of all operations) protected Durability durability; + protected boolean atomic = false; public BatchOperation(final HRegion region, T[] operations) { this.operations = operations; @@ -3067,6 +3068,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getMutation(0).getClusterIds(); } + boolean isAtomic() { + return atomic; + } + /** * Helper method that checks and prepares only one mutation. This can be used to implement * {@link #checkAndPrepare()} for entire Batch. @@ -3097,16 +3102,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } - } catch (NoSuchColumnFamilyException nscf) { + } catch (NoSuchColumnFamilyException nscfe) { final String msg = "No such column family in batch mutation. "; if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); + LOG.warn(msg + nscfe.getMessage()); } else { - LOG.warn(msg, nscf); + LOG.warn(msg, nscfe); observedExceptions.sawNoSuchFamily(); } retCodeDetails[index] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + OperationStatusCode.BAD_FAMILY, nscfe.getMessage()); + if (isAtomic()) { // fail, atomic means all or none + throw nscfe; + } } catch (FailedSanityCheckException fsce) { final String msg = "Batch Mutation did not pass sanity check. "; if (observedExceptions.hasSeenFailedSanityCheck()) { @@ -3117,6 +3125,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } retCodeDetails[index] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + if (isAtomic()) { + throw fsce; + } } catch (WrongRegionException we) { final String msg = "Batch mutation had a row that does not belong to this region. "; if (observedExceptions.hasSeenWrongRegion()) { @@ -3127,6 +3138,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } retCodeDetails[index] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + if (isAtomic()) { + throw we; + } } } @@ -3150,15 +3164,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; try { - rowLock = region.getRowLockInternal(mutation.getRow(), true); + // if atomic then get exclusive lock, else shared lock + rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); } catch (TimeoutIOException e) { // We will retry when other exceptions, but we should stop if we timeout . throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + if (isAtomic()) { // fail, atomic means all or none + throw ioe; + } } if (rowLock == null) { // We failed to grab another lock + if (isAtomic()) { + throw new IOException("Can't apply all operations atomically!"); + } break; // Stop acquiring more rows for this batch } else { acquiredRowLocks.add(rowLock); @@ -3279,12 +3300,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most * of the logic is same. */ - private static class MutationBatchOperation extends BatchOperation { + static class MutationBatchOperation extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup, - long nonce) { + public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic, + long nonceGroup, long nonce) { super(region, operations); + this.atomic = atomic; this.nonceGroup = nonceGroup; this.nonce = nonce; } @@ -3522,11 +3544,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi retCodeDetails[index] = OperationStatus.SUCCESS; } } else { + String msg = "Put/Delete mutations only supported in a batch"; // In case of passing Append mutations along with the Puts and Deletes in batchMutate // mark the operation return code as failure so that it will not be considered in // the doMiniBatchMutation - retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, - "Put/Delete mutations only supported in batchMutate() now"); + retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg); + + if (isAtomic()) { // fail, atomic means all or none + throw new IOException(msg); + } } } @@ -3582,7 +3608,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most * of the logic is same. */ - private static class ReplayBatchOperation extends BatchOperation { + static class ReplayBatchOperation extends BatchOperation { private long origLogSeqNum = 0; public ReplayBatchOperation(final HRegion region, MutationReplay[] operations, long origLogSeqNum) { @@ -3695,11 +3721,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) throws IOException { + return batchMutate(mutations, false, nonceGroup, nonce); + } + + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, + long nonce) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); } public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e1049367ddd..4b3fa505e34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -604,9 +604,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Mutate a list of rows atomically. - * * @param cellScanner if non-null, the mutation data -- the Cell content. - * @param comparator @throws IOException */ private boolean checkAndRowMutate(final HRegion region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, @@ -757,10 +755,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Run through the regionMutation rm and per Mutation, do the work, and then when * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. - * @param region - * @param actions - * @param cellScanner - * @param builder * @param cellsToReturn Could be null. May be allocated in this method. This is what this * method returns as a 'result'. * @param closeCallBack the callback to be used with multigets @@ -864,7 +858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); mutations.clear(); } switch (type) { @@ -925,7 +919,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + try { + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); + } catch (IOException ioe) { + rpcServer.getMetrics().exception(ioe); + NameBytesPair pair = ResponseConverter.buildException(ioe); + resultOrExceptionBuilder.setException(pair); + context.incrementResponseExceptionSize(pair.getSerializedSize()); + builder.addResultOrException(resultOrExceptionBuilder.build()); + } } return cellsToReturn; } @@ -955,7 +957,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) + throws IOException { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -967,7 +970,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * is the mutation belong to. We can't sort ClientProtos.Action array, since they * are bonded to cellscanners. */ - Map mutationActionMap = new HashMap(); + Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { MutationProto m = action.getMutation(); @@ -995,7 +998,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // sort to improve lock efficiency Arrays.sort(mArray); - OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, + OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; @@ -1025,6 +1028,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + if (atomic) { + throw ie; + } for (int i = 0; i < mutations.size(); i++) { builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } @@ -1130,7 +1136,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Exposed for testing - static interface LogDelegate { + interface LogDelegate { void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); } @@ -3229,7 +3235,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); - }; + } try { checkScanNextCallSeq(request, rsh); } catch (OutOfOrderScannerNextException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 127f9498ba9..d538b15bbec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; @@ -165,6 +166,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.mockito.ArgumentCaptor; @@ -200,6 +202,7 @@ public class TestHRegion { @ClassRule public static final TestRule timeout = CategoryBasedTimeout.forClass(TestHRegion.class); + @Rule public final ExpectedException thrown = ExpectedException.none(); private static final String COLUMN_FAMILY = "MyCF"; private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); @@ -215,9 +218,11 @@ public class TestHRegion { // Test names protected TableName tableName; protected String method; + protected final byte[] qual = Bytes.toBytes("qual"); protected final byte[] qual1 = Bytes.toBytes("qual1"); protected final byte[] qual2 = Bytes.toBytes("qual2"); protected final byte[] qual3 = Bytes.toBytes("qual3"); + protected final byte[] value = Bytes.toBytes("value"); protected final byte[] value1 = Bytes.toBytes("value1"); protected final byte[] value2 = Bytes.toBytes("value2"); protected final byte[] row = Bytes.toBytes("rowA"); @@ -1522,21 +1527,10 @@ public class TestHRegion { @Test public void testBatchPut_whileNoRowLocksHeld() throws IOException { - byte[] cf = Bytes.toBytes(COLUMN_FAMILY); - byte[] qual = Bytes.toBytes("qual"); - byte[] val = Bytes.toBytes("val"); - this.region = initHRegion(tableName, method, CONF, cf); + final Put[] puts = new Put[10]; MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { - long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); - - LOG.info("First a batch put with all valid puts"); - final Put[] puts = new Put[10]; - for (int i = 0; i < 10; i++) { - puts[i] = new Put(Bytes.toBytes("row_" + i)); - puts[i].addColumn(cf, qual, val); - } + long syncs = prepareRegionForBachPut(puts, source, false); OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); @@ -1546,7 +1540,7 @@ public class TestHRegion { metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); LOG.info("Next a batch put with one invalid family"); - puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val); + puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { @@ -1563,21 +1557,12 @@ public class TestHRegion { @Test public void testBatchPut_whileMultipleRowLocksHeld() throws Exception { - byte[] cf = Bytes.toBytes(COLUMN_FAMILY); - byte[] qual = Bytes.toBytes("qual"); - byte[] val = Bytes.toBytes("val"); - this.region = initHRegion(tableName, method, CONF, cf); + final Put[] puts = new Put[10]; MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { - long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); + long syncs = prepareRegionForBachPut(puts, source, false); - final Put[] puts = new Put[10]; - for (int i = 0; i < 10; i++) { - puts[i] = new Put(Bytes.toBytes("row_" + i)); - puts[i].addColumn(cf, qual, val); - } - puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val); + puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); LOG.info("batchPut will have to break into four batches to avoid row locks"); RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2")); @@ -1585,7 +1570,6 @@ public class TestHRegion { RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3")); RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true); - MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference retFromThread = new AtomicReference<>(); final CountDownLatch startingPuts = new CountDownLatch(1); @@ -1658,31 +1642,89 @@ public class TestHRegion { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName, - expectedCount, currentCount)); + expectedCount, currentCount)); } } } @Test - public void testBatchPutWithTsSlop() throws Exception { - byte[] cf = Bytes.toBytes(COLUMN_FAMILY); - byte[] qual = Bytes.toBytes("qual"); - byte[] val = Bytes.toBytes("val"); + public void testAtomicBatchPut() throws IOException { + final Put[] puts = new Put[10]; + MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); + try { + long syncs = prepareRegionForBachPut(puts, source, false); + // 1. Straight forward case, should succeed + MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, + HConstants.NO_NONCE, HConstants.NO_NONCE); + OperationStatus[] codes = this.region.batchMutate(batchOp); + assertEquals(10, codes.length); + for (int i = 0; i < 10; i++) { + assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); + } + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); + + // 2. Failed to get lock + RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); + // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this + // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); + final AtomicReference retFromThread = new AtomicReference<>(); + final CountDownLatch finishedPuts = new CountDownLatch(1); + final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, + HConstants + .NO_NONCE, + HConstants.NO_NONCE); + TestThread putter = new TestThread(ctx) { + @Override + public void doWork() throws IOException { + try { + region.batchMutate(finalBatchOp); + } catch (IOException ioe) { + LOG.error("test failed!", ioe); + retFromThread.set(ioe); + } + finishedPuts.countDown(); + } + }; + LOG.info("...starting put thread while holding locks"); + ctx.addThread(putter); + ctx.startThreads(); + LOG.info("...waiting for batch puts while holding locks"); + try { + finishedPuts.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted!", e); + } finally { + if (lock != null) { + lock.release(); + } + } + assertNotNull(retFromThread.get()); + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); + + // 3. Exception thrown in validation + LOG.info("Next a batch put with one invalid family"); + puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); + batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, + HConstants.NO_NONCE); + thrown.expect(NoSuchColumnFamilyException.class); + this.region.batchMutate(batchOp); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + + @Test + public void testBatchPutWithTsSlop() throws Exception { // add data with a timestamp that is too recent for range. Ensure assert CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); - this.region = initHRegion(tableName, method, CONF, cf); + final Put[] puts = new Put[10]; + MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { - MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); - long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); - - final Put[] puts = new Put[10]; - for (int i = 0; i < 10; i++) { - puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100); - puts[i].addColumn(cf, qual, val); - } + long syncs = prepareRegionForBachPut(puts, source, true); OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); @@ -1690,12 +1732,29 @@ public class TestHRegion { assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); - } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); this.region = null; } + } + /** + * @return syncs initial syncTimeNumOps + */ + private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source, + boolean slop) throws IOException { + this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); + + LOG.info("First a batch put with all valid puts"); + for (int i = 0; i < puts.length; i++) { + puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) : + new Put(Bytes.toBytes("row_" + i)); + puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value); + } + + long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); + return syncs; } // ////////////////////////////////////////////////////////////////////////////