HBASE-21910 The nonce implementation is wrong for AsyncTable

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2019-02-15 17:07:21 +08:00 committed by zhangduo
parent 4b185664aa
commit 81e1e2f943
2 changed files with 67 additions and 47 deletions

View File

@ -197,12 +197,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
}
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req,
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
@ -254,18 +252,24 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(append, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.action(
(controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}

View File

@ -21,15 +21,22 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -58,40 +65,50 @@ public class TestAsyncTableNoncedRetry {
private static AsyncConnection ASYNC_CONN;
private static long NONCE = 1L;
private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() {
@Override
public long newNonce() {
return NONCE;
}
@Override
public long getNonceGroup() {
return 1L;
}
};
@Rule
public TestName testName = new TestName();
private byte[] row;
private static AtomicInteger CALLED = new AtomicInteger();
private static long SLEEP_TIME = 2000;
public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
Result result) throws IOException {
if (CALLED.getAndIncrement() == 0) {
Threads.sleepWithoutInterrupt(SLEEP_TIME);
}
return RegionObserver.super.postAppend(c, append, result);
}
@Override
public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
Increment increment, Result result) throws IOException {
if (CALLED.getAndIncrement() == 0) {
Threads.sleepWithoutInterrupt(SLEEP_TIME);
}
return RegionObserver.super.postIncrement(c, increment, result);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.setCoprocessor(SleepOnceCP.class.getName()).build());
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent()) {
@Override
public NonceGenerator getNonceGenerator() {
return NONCE_GENERATOR;
}
};
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
@ -103,28 +120,27 @@ public class TestAsyncTableNoncedRetry {
@Before
public void setUp() throws IOException, InterruptedException {
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
NONCE++;
CALLED.set(0);
}
@Test
public void testAppend() throws InterruptedException, ExecutionException {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// the second call should have no effect as we always generate the same nonce.
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
result = table.get(new Get(row)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
// the second call should have no effect as we always generate the same nonce.
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
Result result = table.get(new Get(row)).get();
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
}
}