HBASE-10355 Failover RPC's from client using region replicas

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1575261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-03-07 14:00:21 +00:00 committed by Enis Soztutar
parent e34dae0c9a
commit c2f6f479ad
12 changed files with 941 additions and 37 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -92,7 +93,7 @@ class AsyncProcess {
protected static final Log LOG = LogFactory.getLog(AsyncProcess.class); protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final AtomicLong COUNTER = new AtomicLong(); protected static final AtomicLong COUNTER = new AtomicLong();
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout"; public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
/** /**
* The context used to wait for results from one submit call. * The context used to wait for results from one submit call.
@ -183,7 +184,7 @@ class AsyncProcess {
protected int numTries; protected int numTries;
protected int serverTrackerTimeout; protected int serverTrackerTimeout;
protected int timeout; protected int timeout;
protected long primaryCallTimeout; protected long primaryCallTimeoutMicroseconds;
// End configuration settings. // End configuration settings.
protected static class BatchErrors { protected static class BatchErrors {
@ -242,7 +243,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT); HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@ -571,9 +572,9 @@ class AsyncProcess {
@Override @Override
public void run() { public void run() {
boolean done = false; boolean done = false;
if (primaryCallTimeout > 0) { if (primaryCallTimeoutMicroseconds > 0) {
try { try {
done = waitUntilDone(startTime + primaryCallTimeout); done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
return; return;
@ -879,7 +880,7 @@ class AsyncProcess {
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
actionsForReplicaThread, startTime); actionsForReplicaThread, startTime);
if (primaryCallTimeout == 0) { if (primaryCallTimeoutMicroseconds == 0) {
// Start replica calls immediately. // Start replica calls immediately.
replicaRunnable.run(); replicaRunnable.run();
} else { } else {
@ -1287,23 +1288,26 @@ class AsyncProcess {
private boolean waitUntilDone(long cutoff) throws InterruptedException { private boolean waitUntilDone(long cutoff) throws InterruptedException {
boolean hasWait = cutoff != Long.MAX_VALUE; boolean hasWait = cutoff != Long.MAX_VALUE;
long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis(); long lastLog = EnvironmentEdgeManager.currentTimeMillis();
long currentInProgress; long currentInProgress;
while (0 != (currentInProgress = actionsInProgress.get())) { while (0 != (currentInProgress = actionsInProgress.get())) {
long now = 0; long now = EnvironmentEdgeManager.currentTimeMillis();
if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) { if (hasWait && (now * 1000L) > cutoff) {
return false; return false;
} }
if (!hasWait) { if (!hasWait) { // Only log if wait is infinite.
// Only log if wait is infinite.
now = EnvironmentEdgeManager.currentTimeMillis();
if (now > lastLog + 10000) { if (now > lastLog + 10000) {
lastLog = now; lastLog = now;
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
} }
synchronized (actionsInProgress) { }
if (actionsInProgress.get() == 0) break; synchronized (actionsInProgress) {
actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE)); if (actionsInProgress.get() == 0) break;
if (!hasWait) {
actionsInProgress.wait(100);
} else {
long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
} }
} }
} }

View File

@ -154,7 +154,17 @@ interface ClusterConnection extends HConnection {
final boolean useCache, final boolean useCache,
final boolean offlined) throws IOException; final boolean offlined) throws IOException;
/**
*
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @return region locations for this row.
* @throws IOException
*/
RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry) throws IOException;
/** /**
* Returns a {@link MasterKeepAliveConnection} to the active master * Returns a {@link MasterKeepAliveConnection} to the active master
*/ */

View File

@ -203,6 +203,12 @@ class ConnectionAdapter implements ClusterConnection {
return wrappedConnection.locateRegion(tableName, row); return wrappedConnection.locateRegion(tableName, row);
} }
@Override
public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
boolean retry) throws IOException {
return wrappedConnection.locateRegion(tableName, row, useCache, retry);
}
@Override @Override
public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
return wrappedConnection.locateRegionAll(tableName, row); return wrappedConnection.locateRegionAll(tableName, row);

View File

@ -986,7 +986,8 @@ class ConnectionManager {
} }
private RegionLocations locateRegion(final TableName tableName, @Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry) final byte [] row, boolean useCache, boolean retry)
throws IOException { throws IOException {
if (this.closed) throw new IOException(toString() + " closed"); if (this.closed) throw new IOException(toString() + " closed");

View File

@ -138,8 +138,12 @@ public class HTable implements HTableInterface {
private ExecutorService pool; // For Multi private ExecutorService pool; // For Multi
private boolean closed; private boolean closed;
private int operationTimeout; private int operationTimeout;
private int retries;
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close() private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
private int primaryCallTimeoutMicroSecond;
/** The Async process for puts with autoflush set to false or multiputs */ /** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap; protected AsyncProcess ap;
@ -361,6 +365,10 @@ public class HTable implements HTableInterface {
this.scannerCaching = this.configuration.getInt( this.scannerCaching = this.configuration.getInt(
HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.primaryCallTimeoutMicroSecond =
this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
@ -817,27 +825,41 @@ public class HTable implements HTableInterface {
*/ */
@Override @Override
public Result get(final Get get) throws IOException { public Result get(final Get get) throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, if (get.getConsistency() == null){
getName(), get.getRow()) { get.setConsistency(defaultConsistency);
@Override }
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request = if (get.getConsistency() == Consistency.STRONG) {
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); // Good old call.
PayloadCarryingRpcController controller = rpcControllerFactory.newController(); RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
controller.setPriority(tableName); getName(), get.getRow()) {
controller.setCallTimeout(callTimeout); @Override
try { public Result call(int callTimeout) throws IOException {
ClientProtos.GetResponse response = getStub().get(controller, request); ClientProtos.GetRequest request =
if (response == null) return null; RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
return ProtobufUtil.toResult(response.getResult()); PayloadCarryingRpcController controller = rpcControllerFactory.newController();
} catch (ServiceException se) { controller.setPriority(tableName);
throw ProtobufUtil.getRemoteException(se); controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
} }
} };
}; return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout); }
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool, retries,
operationTimeout, primaryCallTimeoutMicroSecond);
return callable.call();
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -0,0 +1,282 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
* timeout. If the timeout is reached, it calls all the secondary replicas, and returns
* the first answer. If the answer comes from one of the secondary replica, it will
* be marked as stale.
*/
public class RpcRetryingCallerWithReadReplicas {
static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final Configuration conf;
protected final Get get;
protected final TableName tableName;
protected final int timeBeforeReplicas;
private final int callTimeout;
private final int retries;
private final RpcControllerFactory rpcControllerFactory;
public RpcRetryingCallerWithReadReplicas(
RpcControllerFactory rpcControllerFactory, TableName tableName,
ClusterConnection cConnection, final Get get,
ExecutorService pool, int retries, int callTimeout,
int timeBeforeReplicas) {
this.rpcControllerFactory = rpcControllerFactory;
this.tableName = tableName;
this.cConnection = cConnection;
this.conf = cConnection.getConfiguration();
this.get = get;
this.pool = pool;
this.retries = retries;
this.callTimeout = callTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
}
/**
* A RegionServerCallable that takes into account the replicas, i.e.
* - the call can be on any replica
* - we need to stop retrying when the call is completed
* - we can be interrupted
*/
class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
final int id;
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
this.id = id;
this.location = location;
}
/**
* Two responsibilities
* - if the call is already completed (by another replica) stops the retries.
* - set the location to the right region, depending on the replica.
*/
@Override
public void prepare(final boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload || location == null) {
RegionLocations rl = getRegionLocations(false);
location = id < rl.size() ? rl.getRegionLocation(id) : null;
}
if (location == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
ServerName dest = location.getServerName();
assert dest != null;
setStub(cConnection.getClient(dest));
}
@Override
public Result call(int callTimeout) throws Exception {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
byte[] reg = location.getRegionInfo().getRegionName();
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}
/**
* Adapter to put the HBase retrying caller into a Java callable.
*/
class RetryingRPC implements Callable<Result> {
final RetryingCallable<Result> callable;
RetryingRPC(RetryingCallable<Result> callable) {
this.callable = callable;
}
@Override
public Result call() throws IOException {
return new RpcRetryingCallerFactory(conf).<Result>newCaller().
callWithRetries(callable, callTimeout);
}
}
/**
* Algo:
* - we put the query into the execution pool.
* - after x ms, if we don't have a result, we add the queries for the secondary replicas
* - we take the first answer
* - when done, we cancel what's left. Cancelling means:
* - removing from the pool if the actual call was not started
* - interrupting the call if it has started
* Client side, we need to take into account
* - a call is not executed immediately after being put into the pool
* - a call is a thread. Let's not multiply the number of thread by the number of replicas.
* Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
* can take some i/o.
* <p/>
* Globally, the number of retries, timeout and so on still applies, but it's per replica,
* not global. We continue until all retries are done, or all timeouts are exceeded.
*/
public synchronized Result call()
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
RegionLocations rl = getRegionLocations(true);
BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
addCallsForReplica(cs, rl, 0, 0); // primary.
try {
Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
if (f == null) {
addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries
f = cs.take();
}
return f.get();
} catch (ExecutionException e) {
throwEnrichedException(e);
return null; // unreachable
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
}
}
/**
* Extract the real exception from the ExecutionException, and throws what makes more
* sense.
*/
private void throwEnrichedException(ExecutionException e)
throws RetriesExhaustedException, DoNotRetryIOException {
Throwable t = e.getCause();
assert t != null; // That's what ExecutionException is about: holding an exception
if (t instanceof RetriesExhaustedException) {
throw (RetriesExhaustedException) t;
}
if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) t;
}
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
Collections.singletonList(qt);
throw new RetriesExhaustedException(retries, exceptions);
}
/**
* Creates the calls and submit them
*
* @param cs - the completion service to use for submitting
* @param rl - the region locations
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
*/
private void addCallsForReplica(BoundedCompletionService<Result> cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
cs.submit(retryingOnReplica);
}
}
private RegionLocations getRegionLocations(boolean useCache)
throws RetriesExhaustedException, DoNotRetryIOException {
RegionLocations rl;
try {
rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true);
} catch (IOException e) {
if (e instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) e;
} else if (e instanceof RetriesExhaustedException) {
throw (RetriesExhaustedException) e;
} else {
throw new RetriesExhaustedException("Can't get the location", e);
}
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
return rl;
}
}

View File

@ -968,7 +968,7 @@ public class TestAsyncProcess {
// that the replica call has happened and that way control the ordering. // that the replica call has happened and that way control the ordering.
Configuration conf = new Configuration(); Configuration conf = new Configuration();
ClusterConnection conn = createHConnectionWithReplicas(); ClusterConnection conn = createHConnectionWithReplicas();
conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs); conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
if (retries > 0) { if (retries > 0) {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
} }

View File

@ -0,0 +1,81 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* A completion service, close to the one available in the JDK 1.7
* However, this ones keeps the list of the future, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
*/
public class BoundedCompletionService<V> {
private final Executor executor;
private final List<Future<V>> sent; // alls the call we sent
private final BlockingQueue<Future<V>> completed; // all the results we got so far.
class QueueingFuture extends FutureTask<V> {
public QueueingFuture(Callable<V> callable) {
super(callable);
}
protected void done() {
completed.add(QueueingFuture.this);
}
}
public BoundedCompletionService(Executor executor, int maxTasks) {
this.executor = executor;
this.sent = new ArrayList<Future<V>>(maxTasks);
this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
}
public Future<V> submit(Callable<V> task) {
QueueingFuture newFuture = new QueueingFuture(task);
executor.execute(newFuture);
sent.add(newFuture);
return newFuture;
}
public Future<V> take() throws InterruptedException{
return completed.take();
}
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{
return completed.poll(timeout, unit);
}
public void cancelAll(boolean interrupt) {
for (Future<V> future : sent) {
future.cancel(interrupt);
}
}
}

View File

@ -287,6 +287,12 @@ class CoprocessorHConnection implements ClusterConnection {
return delegate.locateRegions(tableName, useCache, offlined); return delegate.locateRegions(tableName, useCache, offlined);
} }
@Override
public RegionLocations locateRegion(TableName tableName, byte[] row,
boolean useCache, boolean retry) throws IOException {
return delegate.locateRegion(tableName, row, useCache, retry);
}
@Override @Override
public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined) public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
throws IOException { throws IOException {

View File

@ -47,7 +47,7 @@ public class StorefileRefresherChore extends Chore {
/** /**
* The period (in milliseconds) for refreshing the store files for the secondary regions. * The period (in milliseconds) for refreshing the store files for the secondary regions.
*/ */
static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD public static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
= "hbase.regionserver.storefile.refresh.period"; = "hbase.regionserver.storefile.refresh.period";
static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default

View File

@ -0,0 +1,485 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
* cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
*/
@Category(MediumTests.class)
public class TestReplicasClient {
private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
private static final int NB_SERVERS = 1;
private static HTable table = null;
private static final byte[] row = TestReplicasClient.class.getName().getBytes();
private static HRegionInfo hriPrimary;
private static HRegionInfo hriSecondary;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
private final static int REFRESH_PERIOD = 1000;
/**
* This copro is used to synchronize the tests.
*/
public static class SlowMeCopro extends BaseRegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicReference<CountDownLatch> cdl =
new AtomicReference<CountDownLatch>(new CountDownLatch(0));
public SlowMeCopro() {
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
CountDownLatch latch = cdl.get();
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
Thread.sleep(sleepTime.get());
} else if (latch.getCount() > 0) {
LOG.info("Waiting for the counterCountDownLatch");
latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
if (latch.getCount() > 0) {
throw new RuntimeException("Can't wait more");
}
}
} catch (InterruptedException e1) {
LOG.error(e1);
}
} else {
LOG.info("We're not the primary replicas.");
}
}
}
@BeforeClass
public static void beforeClass() throws Exception {
// enable store file refreshing
HTU.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
HTU.startMiniCluster(NB_SERVERS);
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
hdt.addCoprocessor(SlowMeCopro.class.getName());
table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
// mock a secondary region info to open
hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
// No master
LOG.info("Master is going to be stopped");
TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
Configuration c = new Configuration(HTU.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
HBaseAdmin ha = new HBaseAdmin(c);
for (boolean masterRuns = true; masterRuns; ) {
Thread.sleep(100);
try {
masterRuns = false;
masterRuns = ha.isMasterRunning();
} catch (MasterNotRunningException ignored) {
}
}
LOG.info("Master has stopped");
}
@AfterClass
public static void afterClass() throws Exception {
if (table != null) table.close();
HTU.shutdownMiniCluster();
}
@Before
public void before() throws IOException {
HTU.getHBaseAdmin().getConnection().clearRegionCache();
}
@After
public void after() throws IOException, KeeperException {
try {
closeRegion(hriSecondary);
} catch (Exception ignored) {
}
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
HTU.getHBaseAdmin().getConnection().clearRegionCache();
}
private HRegionServer getRS() {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
private void openRegion(HRegionInfo hri) throws Exception {
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
Assert.assertEquals(responseOpen.getOpeningState(0),
AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
checkRegionIsOpened(hri);
}
private void closeRegion(HRegionInfo hri) throws Exception {
ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
getRS().getServerName(), hri.getEncodedName(), true);
AdminProtos.CloseRegionResponse responseClose = getRS()
.getRSRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
checkRegionIsClosed(hri.getEncodedName());
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
}
private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
Assert.assertTrue(
ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
}
private void checkRegionIsClosed(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
try {
Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
} catch (NotServingRegionException expected) {
// That's how it work: if the region is closed we have an exception.
}
// We don't delete the znode here, because there is not always a znode.
}
private void flushRegion(HRegionInfo regionInfo) throws IOException {
for (RegionServerThread rst : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
HRegion region = rst.getRegionServer().getRegionByEncodedName(regionInfo.getEncodedName());
if (region != null) {
region.flushcache();
return;
}
}
throw new IOException("Region to flush cannot be found");
}
@Test
public void testUseRegionWithoutReplica() throws Exception {
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
openRegion(hriSecondary);
SlowMeCopro.cdl.set(new CountDownLatch(0));
try {
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
closeRegion(hriSecondary);
}
}
@Test
public void testLocations() throws Exception {
byte[] b1 = "testLocations".getBytes();
openRegion(hriSecondary);
ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
try {
hc.clearRegionCache();
RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
Assert.assertEquals(2, rl.size());
rl = hc.locateRegion(table.getName(), b1, true, false);
Assert.assertEquals(2, rl.size());
hc.clearRegionCache();
rl = hc.locateRegion(table.getName(), b1, true, false);
Assert.assertEquals(2, rl.size());
rl = hc.locateRegion(table.getName(), b1, false, false);
Assert.assertEquals(2, rl.size());
} finally {
closeRegion(hriSecondary);
}
}
@Test
public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// A get works and is not stale
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
closeRegion(hriSecondary);
}
}
@Test
public void testGetNoResultStaleRegionWithReplica() throws Exception {
byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
openRegion(hriSecondary);
SlowMeCopro.cdl.set(new CountDownLatch(1));
try {
Get g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
SlowMeCopro.cdl.get().countDown();
closeRegion(hriSecondary);
}
}
@Test
public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// We sleep; but we won't go to the stale region as we don't get the stale by default.
SlowMeCopro.sleepTime.set(2000);
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
SlowMeCopro.sleepTime.set(0);
closeRegion(hriSecondary);
}
}
@Test
public void testFlushTable() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriPrimary);
flushRegion(hriSecondary);
Put p = new Put(row);
p.add(f, row, row);
table.put(p);
flushRegion(hriPrimary);
flushRegion(hriSecondary);
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testFlushPrimary() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriPrimary);
Put p = new Put(row);
p.add(f, row, row);
table.put(p);
flushRegion(hriPrimary);
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testFlushSecondary() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriSecondary);
Put p = new Put(row);
p.add(f, row, row);
table.put(p);
flushRegion(hriSecondary);
} catch (TableNotFoundException expected) {
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testUseRegionWithReplica() throws Exception {
byte[] b1 = "testUseRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// A simple put works, even if there here a second replica
Put p = new Put(b1);
p.add(f, b1, b1);
table.put(p);
LOG.info("Put done");
// A get works and is not stale
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
LOG.info("get works and is not stale done");
// Even if it we have to wait a little on the main region
SlowMeCopro.sleepTime.set(2000);
g = new Get(b1);
r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
SlowMeCopro.sleepTime.set(0);
LOG.info("sleep and is not stale done");
// But if we ask for stale we will get it
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
SlowMeCopro.cdl.get().countDown();
LOG.info("stale done");
// exists works and is not stale
g = new Get(b1);
g.setCheckExistenceOnly(true);
r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertTrue(r.getExists());
LOG.info("exists not stale done");
// exists works on stale but don't see the put
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse("The secondary has stale data", r.getExists());
SlowMeCopro.cdl.get().countDown();
LOG.info("exists stale before flush done");
flushRegion(hriPrimary);
flushRegion(hriSecondary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
// get works and is not stale
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse(r.isEmpty());
SlowMeCopro.cdl.get().countDown();
LOG.info("stale done");
// exists works on stale and we see the put after the flush
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getExists());
SlowMeCopro.cdl.get().countDown();
LOG.info("exists stale after flush done");
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
Delete d = new Delete(b1);
table.delete(d);
closeRegion(hriSecondary);
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -52,6 +53,7 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mortbay.log.Log;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -95,6 +97,11 @@ public class TestRegionServerNoMaster {
// No master // No master
HTU.getHBaseCluster().getMaster().stopMaster(); HTU.getHBaseCluster().getMaster().stopMaster();
Log.info("Waiting until master thread exits");
while (HTU.getHBaseCluster().getMasterThread() != null
&& HTU.getHBaseCluster().getMasterThread().isAlive()) {
Threads.sleep(100);
}
// Master is down, so is the meta. We need to assign it somewhere // Master is down, so is the meta. We need to assign it somewhere
// so that regions can be assigned during the mocking phase. // so that regions can be assigned during the mocking phase.
HRegionServer hrs = HTU.getHBaseCluster().getRegionServer(0); HRegionServer hrs = HTU.getHBaseCluster().getRegionServer(0);