HBASE-16596 Reduce redundant interfaces in AsyncProcess

This commit is contained in:
chenheng 2016-09-10 11:13:28 +08:00
parent e1e0637200
commit cc2a40a78f
4 changed files with 31 additions and 61 deletions

View File

@ -392,15 +392,7 @@ class AsyncProcess {
} }
throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
} }
/**
* See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}.
* Uses default ExecutorService for this AP (must have been created with one).
*/
public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows,
boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
throws InterruptedIOException {
return submit(null, tableName, rows, atLeastOne, callback, needResults);
}
/** /**
* See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
* Uses default ExecutorService for this AP (must have been created with one). * Uses default ExecutorService for this AP (must have been created with one).
@ -529,7 +521,7 @@ class AsyncProcess {
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) { ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults); tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout);
// Add location errors if any // Add location errors if any
if (locationErrors != null) { if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) { for (int i = 0; i < locationErrors.size(); ++i) {
@ -564,14 +556,6 @@ class AsyncProcess {
multiAction.add(regionName, action); multiAction.add(regionName, action);
} }
/**
* See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}.
* Uses default ExecutorService for this AP (must have been created with one).
*/
public <CResult> AsyncRequestFuture submitAll(TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
return submitAll(null, tableName, rows, callback, results, null, timeout);
}
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
@ -1785,15 +1769,6 @@ class AsyncProcess {
results, callback, callable, curTimeout); results, callback, callable, curTimeout);
} }
@VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
return createAsyncRequestFuture(
tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
}
/** /**
* Create a callable. Isolated to be easily overridden in the tests. * Create a callable. Isolated to be easily overridden in the tests.
*/ */

View File

@ -1218,7 +1218,7 @@ public class HTable implements Table {
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() { new Callback<ClientProtos.CoprocessorServiceResult>() {
@Override @Override
public void update(byte[] region, byte[] row, public void update(byte[] region, byte[] row,

View File

@ -159,10 +159,11 @@ public class TestAsyncProcess {
@Override @Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<Res> callback, Object[] results, boolean needResults) { Batch.Callback<Res> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int curTimeout) {
// Test HTable has tableName of null, so pass DUMMY_TABLE // Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, rpcTimeout);
allReqs.add(r); allReqs.add(r);
return r; return r;
} }
@ -203,13 +204,7 @@ public class TestAsyncProcess {
// We use results in tests to check things, so override to always save them. // We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
} }
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
throws InterruptedIOException {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
@Override @Override
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
@ -671,7 +666,7 @@ public class TestAsyncProcess {
List<Put> puts = new ArrayList<Put>(); List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true)); puts.add(createPut(1, true));
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
} }
@ -690,7 +685,7 @@ public class TestAsyncProcess {
List<Put> puts = new ArrayList<Put>(); List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true)); puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone(); ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1); Assert.assertEquals(updateCalled.get(), 1);
@ -707,11 +702,11 @@ public class TestAsyncProcess {
for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
} }
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(puts.size(), 1); Assert.assertEquals(puts.size(), 1);
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(0, puts.size()); Assert.assertEquals(0, puts.size());
} }
@ -729,11 +724,11 @@ public class TestAsyncProcess {
puts.add(createPut(1, true)); // <== this one will make it, the region is already in puts.add(createPut(1, true)); // <== this one will make it, the region is already in
puts.add(createPut(2, true)); // <== new region, but the rs is ok puts.add(createPut(2, true)); // <== new region, but the rs is ok
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(" puts=" + puts, 1, puts.size()); Assert.assertEquals(" puts=" + puts, 1, puts.size());
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
} }
@ -745,7 +740,7 @@ public class TestAsyncProcess {
Put p = createPut(1, false); Put p = createPut(1, false);
puts.add(p); puts.add(p);
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size()); Assert.assertEquals(0, puts.size());
ars.waitUntilDone(); ars.waitUntilDone();
verifyResult(ars, false); verifyResult(ars, false);
@ -788,12 +783,12 @@ public class TestAsyncProcess {
Put p = createPut(1, true); Put p = createPut(1, true);
puts.add(p); puts.add(p);
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertFalse(puts.isEmpty()); Assert.assertFalse(puts.isEmpty());
t.start(); t.start();
ap.submit(DUMMY_TABLE, puts, true, null, false); ap.submit(null, DUMMY_TABLE, puts, true, null, false);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
checkPoint.set(true); checkPoint.set(true);
@ -811,7 +806,7 @@ public class TestAsyncProcess {
puts.add(createPut(1, true)); puts.add(createPut(1, true));
puts.add(createPut(1, true)); puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone(); ars.waitUntilDone();
verifyResult(ars, false, true, true); verifyResult(ars, false, true, true);
@ -822,7 +817,7 @@ public class TestAsyncProcess {
puts.add(createPut(1, true)); puts.add(createPut(1, true));
// Wait for AP to be free. While ars might have the result, ap counters are decreased later. // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone(); ap.waitUntilDone();
ars = ap.submit(DUMMY_TABLE, puts, false, null, true); ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size()); Assert.assertEquals(0, puts.size());
ars.waitUntilDone(); ars.waitUntilDone();
Assert.assertEquals(1, ap.callsCt.get()); Assert.assertEquals(1, ap.callsCt.get());
@ -838,7 +833,7 @@ public class TestAsyncProcess {
puts.add(createPut(1, true)); puts.add(createPut(1, true));
puts.add(createPut(1, true)); puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone(); ars.waitUntilDone();
verifyResult(ars, false, true, true); verifyResult(ars, false, true, true);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@ -873,7 +868,7 @@ public class TestAsyncProcess {
puts.add(createPut(2, true)); puts.add(createPut(2, true));
puts.add(createPut(3, true)); puts.add(createPut(3, true));
} }
ap.submit(DUMMY_TABLE, puts, true, null, false); ap.submit(null, DUMMY_TABLE, puts, true, null, false);
ap.waitUntilDone(); ap.waitUntilDone();
// More time to wait if there are incorrect task count. // More time to wait if there are incorrect task count.
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
@ -910,7 +905,7 @@ public class TestAsyncProcess {
t.start(); t.start();
try { try {
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.fail("We should have been interrupted."); Assert.fail("We should have been interrupted.");
} catch (InterruptedIOException expected) { } catch (InterruptedIOException expected) {
} }
@ -929,7 +924,7 @@ public class TestAsyncProcess {
t2.start(); t2.start();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false); ap.submit(null, DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
//Adds 100 to secure us against approximate timing. //Adds 100 to secure us against approximate timing.
@ -1444,7 +1439,7 @@ public class TestAsyncProcess {
// One region has no replica, so the main call succeeds for it. // One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount()); Assert.assertEquals(2, ap.getReplicaCallCount());
} }
@ -1454,7 +1449,7 @@ public class TestAsyncProcess {
// Main call succeeds before replica calls are kicked off. // Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount()); Assert.assertEquals(0, ap.getReplicaCallCount());
} }
@ -1464,7 +1459,7 @@ public class TestAsyncProcess {
// Either main or replica can succeed. // Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount(); long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0); Assert.assertTrue(replicaCalls >= 0);
@ -1479,7 +1474,7 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000); ap.setPrimaryCallDelay(sn2, 2000);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE); verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount()); Assert.assertEquals(1, ap.getReplicaCallCount());
} }
@ -1492,7 +1487,7 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
ap.addFailures(hri1, hri2); ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED); verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount()); Assert.assertEquals(0, ap.getReplicaCallCount());
} }
@ -1504,7 +1499,7 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
ap.addFailures(hri1, hri1r2, hri2); ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE); verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount()); Assert.assertEquals(2, ap.getReplicaCallCount());
} }
@ -1516,7 +1511,7 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE); verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions()); Assert.assertEquals(3, ars.getErrors().getNumExceptions());
@ -1646,7 +1641,7 @@ public class TestAsyncProcess {
List<Put> puts = new ArrayList<Put>(); List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true)); puts.add(createPut(1, true));
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty()); Assert.assertTrue(puts.isEmpty());
} }

View File

@ -136,7 +136,7 @@ public class TestClientPushback {
final AtomicLong endTime = new AtomicLong(); final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime(); long startTime = EnvironmentEdgeManager.currentTime();
((HTable) table).mutator.ap.submit(tableName, ops, true, new Batch.Callback<Result>() { ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback<Result>() {
@Override @Override
public void update(byte[] region, byte[] row, Result result) { public void update(byte[] region, byte[] row, Result result) {
endTime.set(EnvironmentEdgeManager.currentTime()); endTime.set(EnvironmentEdgeManager.currentTime());