diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 5bb0f583987..c5745e9cf48 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -392,15 +392,7 @@ class AsyncProcess { } throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); } - /** - * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. - * Uses default ExecutorService for this AP (must have been created with one). - */ - public AsyncRequestFuture submit(TableName tableName, final List rows, - boolean atLeastOne, Batch.Callback callback, boolean needResults) - throws InterruptedIOException { - return submit(null, tableName, rows, atLeastOne, callback, needResults); - } + /** * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. * Uses default ExecutorService for this AP (must have been created with one). @@ -529,7 +521,7 @@ class AsyncProcess { List locationErrorRows, Map> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -564,14 +556,6 @@ class AsyncProcess { multiAction.add(regionName, action); } - /** - * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. - * Uses default ExecutorService for this AP (must have been created with one). - */ - public AsyncRequestFuture submitAll(TableName tableName, - List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results, null, timeout); - } public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results) { @@ -1785,15 +1769,6 @@ class AsyncProcess { results, callback, callable, curTimeout); } - @VisibleForTesting - /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ - protected AsyncRequestFutureImpl createAsyncRequestFuture( - TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { - return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); - } - /** * Create a callable. Isolated to be easily overridden in the tests. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f8bbfc150aa..492714f6934 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1218,7 +1218,7 @@ public class HTable implements Table { RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); - AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, + AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs, new Callback() { @Override public void update(byte[] region, byte[] row, diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 656dcfc7670..e7366a90989 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -159,10 +159,11 @@ public class TestAsyncProcess { @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { + Batch.Callback callback, Object[] results, boolean needResults, + CancellableRegionServerCallable callable, int curTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl r = super.createAsyncRequestFuture( - DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); + DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, rpcTimeout); allReqs.add(r); return r; } @@ -203,13 +204,7 @@ public class TestAsyncProcess { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } - @Override - public AsyncRequestFuture submit(TableName tableName, List rows, - boolean atLeastOne, Callback callback, boolean needResults) - throws InterruptedIOException { - // We use results in tests to check things, so override to always save them. - return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); - } + @Override public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, @@ -671,7 +666,7 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); } @@ -690,7 +685,7 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); + final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); Assert.assertEquals(updateCalled.get(), 1); @@ -707,11 +702,11 @@ public class TestAsyncProcess { for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); } - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(puts.size(), 1); ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(0, puts.size()); } @@ -729,11 +724,11 @@ public class TestAsyncProcess { puts.add(createPut(1, true)); // <== this one will make it, the region is already in puts.add(createPut(2, true)); // <== new region, but the rs is ok - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(" puts=" + puts, 1, puts.size()); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); } @@ -745,7 +740,7 @@ public class TestAsyncProcess { Put p = createPut(1, false); puts.add(p); - AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); verifyResult(ars, false); @@ -788,12 +783,12 @@ public class TestAsyncProcess { Put p = createPut(1, true); puts.add(p); - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertFalse(puts.isEmpty()); t.start(); - ap.submit(DUMMY_TABLE, puts, true, null, false); + ap.submit(null, DUMMY_TABLE, puts, true, null, false); Assert.assertTrue(puts.isEmpty()); checkPoint.set(true); @@ -811,7 +806,7 @@ public class TestAsyncProcess { puts.add(createPut(1, true)); puts.add(createPut(1, true)); - AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); verifyResult(ars, false, true, true); @@ -822,7 +817,7 @@ public class TestAsyncProcess { puts.add(createPut(1, true)); // Wait for AP to be free. While ars might have the result, ap counters are decreased later. ap.waitUntilDone(); - ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); Assert.assertEquals(1, ap.callsCt.get()); @@ -838,7 +833,7 @@ public class TestAsyncProcess { puts.add(createPut(1, true)); puts.add(createPut(1, true)); - AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); ars.waitUntilDone(); verifyResult(ars, false, true, true); Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); @@ -873,7 +868,7 @@ public class TestAsyncProcess { puts.add(createPut(2, true)); puts.add(createPut(3, true)); } - ap.submit(DUMMY_TABLE, puts, true, null, false); + ap.submit(null, DUMMY_TABLE, puts, true, null, false); ap.waitUntilDone(); // More time to wait if there are incorrect task count. TimeUnit.SECONDS.sleep(1); @@ -910,7 +905,7 @@ public class TestAsyncProcess { t.start(); try { - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.fail("We should have been interrupted."); } catch (InterruptedIOException expected) { } @@ -929,7 +924,7 @@ public class TestAsyncProcess { t2.start(); long start = System.currentTimeMillis(); - ap.submit(DUMMY_TABLE, new ArrayList(), false, null, false); + ap.submit(null, DUMMY_TABLE, new ArrayList(), false, null, false); long end = System.currentTimeMillis(); //Adds 100 to secure us against approximate timing. @@ -1444,7 +1439,7 @@ public class TestAsyncProcess { // One region has no replica, so the main call succeeds for it. MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); + AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]); verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1454,7 +1449,7 @@ public class TestAsyncProcess { // Main call succeeds before replica calls are kicked off. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); + AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]); verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1464,7 +1459,7 @@ public class TestAsyncProcess { // Either main or replica can succeed. MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); long replicaCalls = ap.getReplicaCallCount(); Assert.assertTrue(replicaCalls >= 0); @@ -1479,7 +1474,7 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); ap.setPrimaryCallDelay(sn2, 2000); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FALSE, RR.TRUE); Assert.assertEquals(1, ap.getReplicaCallCount()); } @@ -1492,7 +1487,7 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); ap.addFailures(hri1, hri2); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FAILED, RR.FAILED); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1504,7 +1499,7 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); ap.addFailures(hri1, hri1r2, hri2); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.TRUE, RR.TRUE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1516,7 +1511,7 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FAILED, RR.FALSE); // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); @@ -1646,7 +1641,7 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - ap.submit(DUMMY_TABLE, puts, false, null, false); + ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index baec37e21fb..f4fd60384da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -136,7 +136,7 @@ public class TestClientPushback { final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - ((HTable) table).mutator.ap.submit(tableName, ops, true, new Batch.Callback() { + ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, Result result) { endTime.set(EnvironmentEdgeManager.currentTime());