From 81e1e2f94363e1bf4abb6102a8e379de875fee44 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 15 Feb 2019 17:07:21 +0800 Subject: [PATCH] HBASE-21910 The nonce implementation is wrong for AsyncTable Signed-off-by: Guanghao Zhang --- .../hbase/client/RawAsyncTableImpl.java | 20 ++-- .../client/TestAsyncTableNoncedRetry.java | 94 +++++++++++-------- 2 files changed, 67 insertions(+), 47 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index be94ca45e54..7562e6fd6b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -197,12 +197,10 @@ class RawAsyncTableImpl implements AsyncTable { D convert(I info, S src, long nonceGroup, long nonce) throws IOException; } - private CompletableFuture noncedMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, + private CompletableFuture noncedMutate(long nonceGroup, long nonce, + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, NoncedConverter reqConvert, Converter respConverter) { - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); return mutate(controller, loc, stub, req, (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } @@ -254,18 +252,24 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture append(Append append) { checkHasFamilies(append); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return this. newCaller(append, rpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, - append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .action( + (controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, + loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return this. newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, - stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, + controller, loc, stub, increment, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) .call(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 30085616f8a..82a57f24140 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -21,15 +21,22 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -44,7 +51,7 @@ public class TestAsyncTableNoncedRetry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); + HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -58,40 +65,50 @@ public class TestAsyncTableNoncedRetry { private static AsyncConnection ASYNC_CONN; - private static long NONCE = 1L; - - private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() { - - @Override - public long newNonce() { - return NONCE; - } - - @Override - public long getNonceGroup() { - return 1L; - } - }; - @Rule public TestName testName = new TestName(); private byte[] row; + private static AtomicInteger CALLED = new AtomicInteger(); + + private static long SLEEP_TIME = 2000; + + public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public Result postAppend(ObserverContext c, Append append, + Result result) throws IOException { + if (CALLED.getAndIncrement() == 0) { + Threads.sleepWithoutInterrupt(SLEEP_TIME); + } + return RegionObserver.super.postAppend(c, append, result); + } + + @Override + public Result postIncrement(ObserverContext c, + Increment increment, Result result) throws IOException { + if (CALLED.getAndIncrement() == 0) { + Threads.sleepWithoutInterrupt(SLEEP_TIME); + } + return RegionObserver.super.postIncrement(c, increment, result); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); - TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setCoprocessor(SleepOnceCP.class.getName()).build()); TEST_UTIL.waitTableAvailable(TABLE_NAME); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); - ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()) { - - @Override - public NonceGenerator getNonceGenerator() { - return NONCE_GENERATOR; - } - }; + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @AfterClass @@ -103,28 +120,27 @@ public class TestAsyncTableNoncedRetry { @Before public void setUp() throws IOException, InterruptedException { row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); - NONCE++; + CALLED.set(0); } @Test public void testAppend() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - // the second call should have no effect as we always generate the same nonce. - assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - result = table.get(new Get(row)).get(); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); } @Test public void testIncrement() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); - // the second call should have no effect as we always generate the same nonce. - assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); - Result result = table.get(new Get(row)).get(); - assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); } }