diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java new file mode 100644 index 00000000000..43011e920fc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This should be implemented by the Get/Scan implementations that + * talk to replica regions. When an RPC response is received from one + * of the replicas, the RPCs to the other replicas are cancelled. + */ +@InterfaceAudience.Private +interface Cancellable { + public void cancel(); + public boolean isCancelled(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 05202c8f77f..3f1ba84f91b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -275,6 +277,11 @@ public class ClientScanner extends AbstractClientScanner { return true; } + @VisibleForTesting + boolean isAnyRPCcancelled() { + return callable.isAnyRPCcancelled(); + } + static Result[] call(Scan scan, ScannerCallableWithReplicas callable, RpcRetryingCaller caller, int scannerTimeout) throws IOException, RuntimeException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 90fb1c2c209..9fc9cc68041 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner { ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), getScan(), getCaching(), true); ScanResponse response = null; - PayloadCarryingRpcController controller = controllerFactory.newController(); + controller = controllerFactory.newController(); try { controller.setPriority(getTableName()); controller.setCallTimeout(timeout); @@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner { @Override public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics, - controllerFactory, getCaching(), id); + return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), + scanMetrics, controllerFactory, getCaching(), id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java new file mode 100644 index 00000000000..1dab7765238 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -0,0 +1,165 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.htrace.Trace; + +/** + * A completion service for the RpcRetryingCallerFactory. + * Keeps the list of the futures, and allows to cancel them all. + * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. + */ +@InterfaceAudience.Private +public class ResultBoundedCompletionService { + private final RpcRetryingCallerFactory retryingCallerFactory; + private final Executor executor; + private final QueueingFuture[] tasks; // all the tasks + private volatile QueueingFuture completed = null; + + class QueueingFuture implements RunnableFuture { + private final RetryingCallable future; + private T result = null; + private ExecutionException exeEx = null; + private volatile boolean cancelled; + private final int callTimeout; + private final RpcRetryingCaller retryingCaller; + private boolean resultObtained = false; + + + public QueueingFuture(RetryingCallable future, int callTimeout) { + this.future = future; + this.callTimeout = callTimeout; + this.retryingCaller = retryingCallerFactory.newCaller(); + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + try { + if (!cancelled) { + result = + this.retryingCaller.callWithRetries(future, callTimeout); + resultObtained = true; + } + } catch (Throwable t) { + exeEx = new ExecutionException(t); + } finally { + if (!cancelled && completed == null) { + completed = (QueueingFuture) QueueingFuture.this; + synchronized (tasks) { + tasks.notify(); + } + } + } + } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (resultObtained || exeEx != null) return false; + retryingCaller.cancel(); + if (future instanceof Cancellable) ((Cancellable)future).cancel(); + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return resultObtained || exeEx != null; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return get(1000, TimeUnit.DAYS); + } catch (TimeoutException e) { + throw new RuntimeException("You did wait for 1000 days here?", e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + synchronized (tasks) { + if (resultObtained) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + unit.timedWait(tasks, timeout); + } + if (resultObtained) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + + throw new TimeoutException("timeout=" + timeout + ", " + unit); + } + } + + @SuppressWarnings("unchecked") + public ResultBoundedCompletionService( + RpcRetryingCallerFactory retryingCallerFactory, Executor executor, + int maxTasks) { + this.retryingCallerFactory = retryingCallerFactory; + this.executor = executor; + this.tasks = new QueueingFuture[maxTasks]; + } + + + public void submit(RetryingCallable task, int callTimeout, int id) { + QueueingFuture newFuture = new QueueingFuture(task, callTimeout); + executor.execute(Trace.wrap(newFuture)); + tasks[id] = newFuture; + } + + public QueueingFuture take() throws InterruptedException { + synchronized (tasks) { + while (completed == null) tasks.wait(); + } + return completed; + } + + public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (tasks) { + if (completed == null) unit.timedWait(tasks, timeout); + } + return completed; + } + + public void cancelAll() { + for (QueueingFuture future : tasks) { + if (future != null) future.cancel(true); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 57accced8d1..273a1e1d767 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -27,12 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.protobuf.ServiceException; -import org.htrace.Trace; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -99,7 +95,7 @@ public class RpcRetryingCallerWithReadReplicas { * - we need to stop retrying when the call is completed * - we can be interrupted */ - class ReplicaRegionServerCallable extends RegionServerCallable { + class ReplicaRegionServerCallable extends RegionServerCallable implements Cancellable { final int id; private final PayloadCarryingRpcController controller; @@ -112,7 +108,8 @@ public class RpcRetryingCallerWithReadReplicas { controller.setPriority(tableName); } - public void startCancel() { + @Override + public void cancel() { controller.startCancel(); } @@ -169,6 +166,11 @@ public class RpcRetryingCallerWithReadReplicas { throw ProtobufUtil.getRemoteException(se); } } + + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } } /** @@ -194,7 +196,8 @@ public class RpcRetryingCallerWithReadReplicas { RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); + ResultBoundedCompletionService cs = + new ResultBoundedCompletionService(this.rpcRetryingCallerFactory, pool, rl.size()); if(isTargetReplicaSpecified) { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); @@ -273,12 +276,12 @@ public class RpcRetryingCallerWithReadReplicas { * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. */ - private void addCallsForReplica(ResultBoundedCompletionService cs, + private void addCallsForReplica(ResultBoundedCompletionService cs, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, callTimeout); + cs.submit(callOnReplica, callTimeout, id); } } @@ -308,137 +311,4 @@ public class RpcRetryingCallerWithReadReplicas { return rl; } - - - /** - * A completion service for the RpcRetryingCallerFactory. - * Keeps the list of the futures, and allows to cancel them all. - * This means as well that it can be used for a small set of tasks only. - *
Implementation is not Thread safe. - */ - public class ResultBoundedCompletionService { - private final Executor executor; - private final QueueingFuture[] tasks; // all the tasks - private volatile QueueingFuture completed = null; - - class QueueingFuture implements RunnableFuture { - private final ReplicaRegionServerCallable future; - private Result result = null; - private ExecutionException exeEx = null; - private volatile boolean canceled; - private final int callTimeout; - private final RpcRetryingCaller retryingCaller; - - - public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) { - this.future = future; - this.callTimeout = callTimeout; - this.retryingCaller = rpcRetryingCallerFactory.newCaller(); - } - - @Override - public void run() { - try { - if (!canceled) { - result = - rpcRetryingCallerFactory.newCaller().callWithRetries(future, callTimeout); - } - } catch (Throwable t) { - exeEx = new ExecutionException(t); - } finally { - if (!canceled && completed == null) { - completed = QueueingFuture.this; - synchronized (tasks) { - tasks.notify(); - } - } - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (result != null || exeEx != null) return false; - retryingCaller.cancel(); - future.startCancel(); - canceled = true; - return true; - } - - @Override - public boolean isCancelled() { - return canceled; - } - - @Override - public boolean isDone() { - return result != null || exeEx != null; - } - - @Override - public Result get() throws InterruptedException, ExecutionException { - try { - return get(1000, TimeUnit.DAYS); - } catch (TimeoutException e) { - throw new RuntimeException("You did wait for 1000 days here?", e); - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE", - justification="Is this an issue?") - @Override - public Result get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - synchronized (tasks) { - if (result != null) { - return result; - } - if (exeEx != null) { - throw exeEx; - } - unit.timedWait(tasks, timeout); - } - // Findbugs says this null check is redundant. Will result be set across the wait above? - if (result != null) { - return result; - } - if (exeEx != null) { - throw exeEx; - } - - throw new TimeoutException("timeout=" + timeout + ", " + unit); - } - } - - public ResultBoundedCompletionService(Executor executor, int maxTasks) { - this.executor = executor; - this.tasks = new QueueingFuture[maxTasks]; - } - - - public void submit(ReplicaRegionServerCallable task, int callTimeout) { - QueueingFuture newFuture = new QueueingFuture(task, callTimeout); - executor.execute(Trace.wrap(newFuture)); - tasks[task.id] = newFuture; - } - - public QueueingFuture take() throws InterruptedException { - synchronized (tasks) { - while (completed == null) tasks.wait(); - } - return completed; - } - - public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { - synchronized (tasks) { - if (completed == null) unit.timedWait(tasks, timeout); - } - return completed; - } - - public void cancelAll() { - for (QueueingFuture future : tasks) { - if (future != null) future.cancel(true); - } - } - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0aecef20a99..22f98a3b6dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -88,6 +88,7 @@ public class ScannerCallable extends RegionServerCallable { protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; protected RpcControllerFactory controllerFactory; + protected PayloadCarryingRpcController controller; /** * @param connection which connection @@ -123,6 +124,10 @@ public class ScannerCallable extends RegionServerCallable { this.controllerFactory = rpcControllerFactory; } + PayloadCarryingRpcController getController() { + return controller; + } + /** * @param reload force reload of server location * @throws IOException @@ -191,7 +196,7 @@ public class ScannerCallable extends RegionServerCallable { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; - PayloadCarryingRpcController controller = controllerFactory.newController(); + controller = controllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 0de658bfd2e..42099877de3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -39,8 +39,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.annotations.VisibleForTesting; /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -69,8 +70,9 @@ class ScannerCallableWithReplicas implements RetryingCallable { private Configuration conf; private int scannerTimeout; private Set outstandingCallables = new HashSet(); + private boolean someRPCcancelled = false; //required for testing purposes only - public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection, + public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller caller) { @@ -134,8 +136,10 @@ class ScannerCallableWithReplicas implements RetryingCallable { // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) - BoundedCompletionService> cs = - new BoundedCompletionService>(pool, rl.size() * 5); + ResultBoundedCompletionService> cs = + new ResultBoundedCompletionService>( + new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool, + rl.size() * 5); List exceptions = null; int submitted = 0, completed = 0; @@ -192,7 +196,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. - cs.cancelAll(true); + cs.cancelAll(); } if (exceptions != null && !exceptions.isEmpty()) { @@ -226,8 +230,14 @@ class ScannerCallableWithReplicas implements RetryingCallable { // want to wait for the "close" to happen yet. The "wait" will happen when // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); - RetryingRPC r = new RetryingRPC(s); - pool.submit(r); + final RetryingRPC r = new RetryingRPC(s); + pool.submit(new Callable(){ + @Override + public Void call() throws Exception { + r.call(scannerTimeout); + return null; + } + }); } // now clear outstandingCallables since we scheduled a close for all the contained scanners outstandingCallables.clear(); @@ -244,16 +254,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { } private int addCallsForCurrentReplica( - BoundedCompletionService> cs, RegionLocations rl) { + ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica); + cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); return 1; } private int addCallsForOtherReplicas( - BoundedCompletionService> cs, RegionLocations rl, int min, - int max) { + ResultBoundedCompletionService> cs, RegionLocations rl, + int min, int max) { if (scan.getConsistency() == Consistency.STRONG) { return 0; // not scheduling on other replicas for strong consistency } @@ -267,32 +277,85 @@ class ScannerCallableWithReplicas implements RetryingCallable { } outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica); + cs.submit(retryingOnReplica, scannerTimeout, id); } return max - min + 1; } - class RetryingRPC implements Callable> { + @VisibleForTesting + boolean isAnyRPCcancelled() { + return someRPCcancelled; + } + + class RetryingRPC implements RetryingCallable>, Cancellable { final ScannerCallable callable; + RpcRetryingCaller caller; + private volatile boolean cancelled = false; RetryingRPC(ScannerCallable callable) { this.callable = callable; - } - - @Override - public Pair call() throws IOException { // For the Consistency.STRONG (default case), we reuse the caller // to keep compatibility with what is done in the past // For the Consistency.TIMELINE case, we can't reuse the caller // since we could be making parallel RPCs (caller.callWithRetries is synchronized // and we can't invoke it multiple times at the same time) - RpcRetryingCaller caller = ScannerCallableWithReplicas.this.caller; + this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). + this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). newCaller(); } - Result[] res = caller.callWithRetries(callable, scannerTimeout); - return new Pair(res, callable); + } + + @Override + public Pair call(int callTimeout) throws IOException { + // since the retries is done within the ResultBoundedCompletionService, + // we don't invoke callWithRetries here + if (cancelled) { + return null; + } + Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); + return new Pair(res, this.callable); + } + + @Override + public void prepare(boolean reload) throws IOException { + if (cancelled) return; + + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + callable.prepare(reload); + } + + @Override + public void throwable(Throwable t, boolean retrying) { + callable.throwable(t, retrying); + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return callable.getExceptionMessageAdditionalDetail(); + } + + @Override + public long sleep(long pause, int tries) { + return callable.sleep(pause, tries); + } + + @Override + public void cancel() { + cancelled = true; + caller.cancel(); + if (callable.getController() != null) { + callable.getController().startCancel(); + } + someRPCcancelled = true; + } + + @Override + public boolean isCancelled() { + return cancelled; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 3d1b1c80832..bb2d4db001d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -539,6 +539,54 @@ public class TestReplicasClient { runMultipleScansOfOneType(true, false); } + @Test + public void testCancelOfScan() throws Exception { + openRegion(hriSecondary); + int NUMROWS = 100; + try { + for (int i = 0; i < NUMROWS; i++) { + byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); + Put p = new Put(b1); + p.add(f, b1, b1); + table.put(p); + } + LOG.debug("PUT done"); + int caching = 20; + byte[] start; + start = Bytes.toBytes("testUseRegionWithReplica" + 0); + + flushRegion(hriPrimary); + LOG.info("flush done"); + Thread.sleep(1000 + REFRESH_PERIOD * 2); + + // now make some 'next' calls slow + SlowMeCopro.slowDownNext.set(true); + SlowMeCopro.countOfNext.set(0); + SlowMeCopro.sleepTime.set(5000); + + Scan scan = new Scan(start); + scan.setCaching(caching); + scan.setConsistency(Consistency.TIMELINE); + ResultScanner scanner = table.getScanner(scan); + Iterator iter = scanner.iterator(); + iter.next(); + Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled()); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + } finally { + SlowMeCopro.getCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + for (int i = 0; i < NUMROWS; i++) { + byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); + Delete d = new Delete(b1); + table.delete(d); + } + closeRegion(hriSecondary); + } + } + private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception { openRegion(hriSecondary); int NUMROWS = 100;