HBASE-8764 Some MasterMonitorCallable should retry

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1507495 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-07-26 22:38:15 +00:00
parent 77fabe7333
commit 50df1e2fd7
35 changed files with 933 additions and 682 deletions

View File

@ -84,7 +84,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
class AsyncProcess<CResult> {
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected final HConnection hConnection;
protected final byte[] tableName;
protected final ExecutorService pool;
@ -406,9 +405,9 @@ class AsyncProcess<CResult> {
public void run() {
MultiResponse res;
try {
ServerCallable<MultiResponse> callable = createCallable(loc, multi);
MultiServerCallable<Row> callable = createCallable(loc, multi);
try {
res = callable.withoutRetries();
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
LOG.warn("The call to the RS failed, we don't know where we stand. location="
+ loc, e);
@ -441,10 +440,19 @@ class AsyncProcess<CResult> {
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
protected ServerCallable<MultiResponse> createCallable(
final HRegionLocation loc, final MultiAction<Row> multi) {
protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
}
return new MultiServerCallable<Row>(hConnection, tableName, loc, multi);
/**
* For tests.
* @param callable
* @return Returns a caller.
*/
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
// callable is unused.
return new RpcRetryingCaller<MultiResponse>();
}
/**

View File

@ -66,6 +66,7 @@ public class ClientScanner extends AbstractClientScanner {
private final byte[] tableName;
private final int scannerTimeout;
private boolean scanMetricsPublished = false;
private ScannerCaller caller = new ScannerCaller();
/**
* Create a new ClientScanner for the specified table. An HConnection will be
@ -179,7 +180,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
callable.withRetries();
this.caller.callWithRetries(callable, getConnection().getConfiguration());
this.callable = null;
}
@ -216,7 +217,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey);
// Open a scanner on the region server starting at the
// beginning of the region
callable.withRetries();
this.caller.callWithRetries(callable, getConnection().getConfiguration());
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -276,10 +277,10 @@ public class ClientScanner extends AbstractClientScanner {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = callable.withRetries();
values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning again
values = callable.withRetries();
values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
@ -402,7 +403,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
callable.withRetries();
this.caller.callWithRetries(callable, getConnection().getConfiguration());
} catch (IOException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to

View File

@ -24,8 +24,7 @@ import org.apache.hadoop.hbase.HConstants;
import java.util.Random;
/**
* Utility used by client connections such as {@link HConnection} and
* {@link ServerCallable}
* Utility used by client connections.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving

View File

@ -25,7 +25,6 @@ import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -84,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
@ -522,7 +520,7 @@ public class HBaseAdmin implements Abortable, Closeable {
}
}
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
@ -555,7 +553,7 @@ public class HBaseAdmin implements Abortable, Closeable {
HRegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
boolean tableExists = true;
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
@ -753,7 +751,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public void enableTableAsync(final byte [] tableName)
throws IOException {
HTableDescriptor.isLegalTableName(tableName);
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
LOG.info("Started enable of " + Bytes.toString(tableName));
@ -824,7 +822,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void disableTableAsync(final byte [] tableName) throws IOException {
HTableDescriptor.isLegalTableName(tableName);
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
LOG.info("Started disable of " + Bytes.toString(tableName));
@ -1031,7 +1029,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
throws IOException {
HTableDescriptor.isLegalTableName(tableName);
return execute(new MasterMonitorCallable<Pair<Integer, Integer>>() {
return executeCallable(new MasterMonitorCallable<Pair<Integer, Integer>>(getConnection()) {
@Override
public Pair<Integer, Integer> call() throws ServiceException {
GetSchemaAlterStatusRequest req = RequestConverter
@ -1067,7 +1065,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void addColumn(final byte [] tableName, final HColumnDescriptor column)
throws IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
@ -1100,7 +1098,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void deleteColumn(final byte [] tableName, final byte [] columnName)
throws IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
@ -1135,7 +1133,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void modifyColumn(final byte [] tableName, final HColumnDescriptor descriptor)
throws IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
@ -1542,7 +1540,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void assign(final byte[] regionName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(regionName);
@ -1568,7 +1566,7 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void unassign(final byte [] regionName, final boolean force)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
UnassignRegionRequest request =
@ -1817,7 +1815,7 @@ public class HBaseAdmin implements Abortable, Closeable {
"' doesn't match with the HTD one: " + htd.getNameAsString());
}
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd);
@ -1890,7 +1888,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
public synchronized void shutdown() throws IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
masterAdmin.shutdown(null,ShutdownRequest.newBuilder().build());
@ -1906,7 +1904,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
public synchronized void stopMaster() throws IOException {
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
masterAdmin.stopMaster(null,StopMasterRequest.newBuilder().build());
@ -1942,7 +1940,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
public ClusterStatus getClusterStatus() throws IOException {
return execute(new MasterMonitorCallable<ClusterStatus>() {
return executeCallable(new MasterMonitorCallable<ClusterStatus>(getConnection()) {
@Override
public ClusterStatus call() throws ServiceException {
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
@ -2301,7 +2299,7 @@ public class HBaseAdmin implements Abortable, Closeable {
Thread.currentThread().interrupt();
}
LOG.debug("Getting current status of snapshot from master...");
done = execute(new MasterAdminCallable<IsSnapshotDoneResponse>() {
done = executeCallable(new MasterAdminCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
public IsSnapshotDoneResponse call() throws ServiceException {
return masterAdmin.isSnapshotDone(null, request);
@ -2330,7 +2328,7 @@ public class HBaseAdmin implements Abortable, Closeable {
final TakeSnapshotRequest request = TakeSnapshotRequest.newBuilder().setSnapshot(snapshot)
.build();
// run the snapshot on the master
return execute(new MasterAdminCallable<TakeSnapshotResponse>() {
return executeCallable(new MasterAdminCallable<TakeSnapshotResponse>(getConnection()) {
@Override
public TakeSnapshotResponse call() throws ServiceException {
return masterAdmin.snapshot(null, request);
@ -2361,7 +2359,7 @@ public class HBaseAdmin implements Abortable, Closeable {
public boolean isSnapshotFinished(final SnapshotDescription snapshot)
throws IOException, HBaseSnapshotException, UnknownSnapshotException {
return execute(new MasterAdminCallable<IsSnapshotDoneResponse>() {
return executeCallable(new MasterAdminCallable<IsSnapshotDoneResponse>(getConnection()) {
@Override
public IsSnapshotDoneResponse call() throws ServiceException {
return masterAdmin.isSnapshotDone(null,
@ -2503,7 +2501,8 @@ public class HBaseAdmin implements Abortable, Closeable {
Thread.currentThread().interrupt();
}
LOG.debug("Getting current status of snapshot restore from master...");
done = execute(new MasterAdminCallable<IsRestoreSnapshotDoneResponse>() {
done = executeCallable(new MasterAdminCallable<IsRestoreSnapshotDoneResponse>(
getConnection()) {
@Override
public IsRestoreSnapshotDoneResponse call() throws ServiceException {
return masterAdmin.isRestoreSnapshotDone(null, request);
@ -2533,7 +2532,7 @@ public class HBaseAdmin implements Abortable, Closeable {
.build();
// run the snapshot restore on the master
return execute(new MasterAdminCallable<RestoreSnapshotResponse>() {
return executeCallable(new MasterAdminCallable<RestoreSnapshotResponse>(getConnection()) {
@Override
public RestoreSnapshotResponse call() throws ServiceException {
return masterAdmin.restoreSnapshot(null, request);
@ -2547,7 +2546,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @throws IOException if a network error occurs
*/
public List<SnapshotDescription> listSnapshots() throws IOException {
return execute(new MasterAdminCallable<List<SnapshotDescription>>() {
return executeCallable(new MasterAdminCallable<List<SnapshotDescription>>(getConnection()) {
@Override
public List<SnapshotDescription> call() throws ServiceException {
return masterAdmin.getCompletedSnapshots(null, ListSnapshotRequest.newBuilder().build())
@ -2603,13 +2602,12 @@ public class HBaseAdmin implements Abortable, Closeable {
// make sure the snapshot is possibly valid
HTableDescriptor.isLegalTableName(Bytes.toBytes(snapshotName));
// do the delete
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
masterAdmin.deleteSnapshot(
null,
DeleteSnapshotRequest.newBuilder()
.setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build());
masterAdmin.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder().
setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build());
return null;
}
});
@ -2633,75 +2631,88 @@ public class HBaseAdmin implements Abortable, Closeable {
List<SnapshotDescription> snapshots = listSnapshots(pattern);
for (final SnapshotDescription snapshot : snapshots) {
// do the delete
execute(new MasterAdminCallable<Void>() {
executeCallable(new MasterAdminCallable<Void>(getConnection()) {
@Override
public Void call() throws ServiceException {
masterAdmin.deleteSnapshot(
null,
this.masterAdmin.deleteSnapshot(null,
DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot).build());
return null;
}
});
}
}
/**
* @see {@link #execute(MasterAdminCallable<V>)}
*/
private abstract static class MasterAdminCallable<V> implements Callable<V>{
abstract static class MasterAdminCallable<V> extends MasterCallable<V> {
protected MasterAdminKeepAliveConnection masterAdmin;
private final HConnection connection;
public MasterAdminCallable(final HConnection connection) {
this.connection = connection;
}
@Override
public void prepare(boolean reload) throws IOException {
this.masterAdmin = this.connection.getKeepAliveMasterAdminService();
}
@Override
public void close() throws IOException {
this.masterAdmin.close();
}
}
/**
* @see {@link #execute(MasterMonitorCallable<V>)}
*/
private abstract static class MasterMonitorCallable<V> implements Callable<V> {
abstract static class MasterMonitorCallable<V> extends MasterCallable<V> {
protected MasterMonitorKeepAliveConnection masterMonitor;
}
private final HConnection connection;
/**
* This method allows to execute a function requiring a connection to
* master without having to manage the connection creation/close.
* Create a {@link MasterAdminCallable} to use it.
*/
private <V> V execute(MasterAdminCallable<V> function) throws IOException {
function.masterAdmin = connection.getKeepAliveMasterAdminService();
try {
return executeCallable(function);
} finally {
function.masterAdmin.close();
public MasterMonitorCallable(final HConnection connection) {
this.connection = connection;
}
@Override
public void prepare(boolean reload) throws IOException {
this.masterMonitor = this.connection.getKeepAliveMasterMonitorService();
}
@Override
public void close() throws IOException {
this.masterMonitor.close();
}
}
/**
* This method allows to execute a function requiring a connection to
* master without having to manage the connection creation/close.
* Create a {@link MasterAdminCallable} to use it.
* Parent of {@link MasterMonitorCallable} and {@link MasterAdminCallable}.
* Has common methods.
* @param <V>
*/
private <V> V execute(MasterMonitorCallable<V> function) throws IOException {
function.masterMonitor = connection.getKeepAliveMasterMonitorService();
try {
return executeCallable(function);
} finally {
function.masterMonitor.close();
abstract static class MasterCallable<V> implements RetryingCallable<V>, Closeable {
@Override
public void throwable(Throwable t, boolean retrying) {
}
@Override
public String getExceptionMessageAdditionalDetail() {
return "";
}
@Override
public long sleep(long pause, int tries) {
return ConnectionUtils.getPauseTime(pause, tries);
}
}
/**
* Helper function called by other execute functions.
*/
private <V> V executeCallable(Callable<V> function) throws IOException {
private <V> V executeCallable(MasterCallable<V> callable) throws IOException {
RpcRetryingCaller<V> caller = new RpcRetryingCaller<V>();
try {
return function.call();
} catch (RemoteException re) {
throw re.unwrapRemoteException();
} catch (IOException e) {
throw e;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
} catch (Exception e) {
// This should not happen...
throw new IOException("Unexpected exception when calling master", e);
return caller.callWithRetries(callable, getConfiguration());
} finally {
callable.close();
}
}
@ -2729,4 +2740,4 @@ public class HBaseAdmin implements Abortable, Closeable {
public CoprocessorRpcChannel coprocessorService() {
return new MasterCoprocessorRpcChannel(connection);
}
}
}

View File

@ -59,6 +59,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMoni
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HConnection extends Abortable, Closeable {
/**
* Key for configuration in Configuration whose value is the class we implement making a
* new HConnection instance.
*/
public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl";
/**
* @return Configuration instance being used by this HConnection instance.
*/
@ -269,34 +275,6 @@ public interface HConnection extends Abortable, Closeable {
boolean reload)
throws IOException;
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will manage the process of doing retries with timed waits
* and refinds of missing regions.
*
* @param <T> the type of the return value
* @param callable callable to run
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
@Deprecated
<T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will pass it to the defined region server.
* @param <T> the type of the return value
* @param callable callable to run
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
@Deprecated
<T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
/**
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call.

View File

@ -1954,18 +1954,6 @@ public class HConnectionManager {
}
}
@Override
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
return callable.withRetries();
}
@Override
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
return callable.withoutRetries();
}
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
ServerName serverName, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);

View File

@ -222,18 +222,6 @@ public class HConnectionWrapper implements HConnection {
return hconnection.getRegionLocation(tableName, row, reload);
}
@Override
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
return hconnection.getRegionServerWithRetries(callable);
}
@Override
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
return hconnection.getRegionServerWithoutRetries(callable);
}
@Override
public void processBatch(List<? extends Row> actions, byte[] tableName,
ExecutorService pool, Object[] results) throws IOException,

View File

@ -592,12 +592,15 @@ public class HTable implements HTableInterface {
@Override
public Result getRowOrBefore(final byte[] row, final byte[] family)
throws IOException {
return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
tableName, row) {
public Result call() throws IOException {
return ProtobufUtil.getRowOrBefore(stub,
location.getRegionInfo().getRegionName(), row, family);
return ProtobufUtil.getRowOrBefore(getStub(),
getLocation().getRegionInfo().getRegionName(), row, family);
}
}.withRetries();
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -637,12 +640,14 @@ public class HTable implements HTableInterface {
*/
@Override
public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return ProtobufUtil.get(stub,
location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getTableName(), get.getRow()) {
public Result call() throws IOException {
return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -704,18 +709,21 @@ public class HTable implements HTableInterface {
@Override
public void delete(final Delete delete)
throws IOException {
new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), delete);
MutateResponse response = stub.mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}.withRetries();
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -838,19 +846,21 @@ public class HTable implements HTableInterface {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
new ServerCallable<Void>(connection, tableName, rm.getRow(),
operationTimeout) {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(connection, getTableName(), rm.getRow()) {
public Void call() throws IOException {
try {
MultiRequest request = RequestConverter.buildMultiRequest(
location.getRegionInfo().getRegionName(), rm);
stub.multi(null, request);
getLocation().getRegionInfo().getRegionName(), rm);
getStub().multi(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
}.withRetries();
};
new RpcRetryingCaller<Void>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -862,21 +872,23 @@ public class HTable implements HTableInterface {
throw new IOException(
"Invalid arguments to append, no columns specified");
}
return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
public Result call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), append);
PayloadCarryingRpcController rpcController =
new PayloadCarryingRpcController();
MutateResponse response = stub.mutate(rpcController, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
RegionServerCallable<Result> callable =
new RegionServerCallable<Result>(this.connection, getTableName(), append.getRow()) {
public Result call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
MutateResponse response = getStub().mutate(rpcController, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}.withRetries();
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -888,19 +900,22 @@ public class HTable implements HTableInterface {
throw new IOException(
"Invalid arguments to increment, no columns specified");
}
return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
public Result call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), increment);
PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
MutateResponse response = stub.mutate(rpcContoller, request);
return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getTableName(), increment.getRow()) {
public Result call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment);
PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
MutateResponse response = getStub().mutate(rpcContoller, request);
return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}.withRetries();
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -932,22 +947,26 @@ public class HTable implements HTableInterface {
throw new IOException(
"Invalid arguments to incrementColumnValue", npe);
}
return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
public Long call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
MutateResponse response = stub.mutate(rpcController, request);
Result result =
ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getTableName(), row) {
public Long call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
MutateResponse response = getStub().mutate(rpcController, request);
Result result =
ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}.withRetries();
}
};
return new RpcRetryingCaller<Long>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -958,19 +977,22 @@ public class HTable implements HTableInterface {
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
throws IOException {
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family, qualifier,
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getTableName(), row) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, put);
MutateResponse response = stub.mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
MutateResponse response = getStub().mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}.withRetries();
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
@ -982,19 +1004,22 @@ public class HTable implements HTableInterface {
final byte [] family, final byte [] qualifier, final byte [] value,
final Delete delete)
throws IOException {
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family, qualifier,
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getTableName(), row) {
public Boolean call() throws IOException {
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, delete);
MutateResponse response = stub.mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
MutateResponse response = getStub().mutate(null, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}.withRetries();
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -1002,19 +1027,21 @@ public class HTable implements HTableInterface {
*/
@Override
public boolean exists(final Get get) throws IOException {
return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
public Boolean call() throws IOException {
try {
GetRequest request = RequestConverter.buildGetRequest(
location.getRegionInfo().getRegionName(), get, true);
GetResponse response = stub.get(null, request);
return response.getExists();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}.withRetries();
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getTableName(), get.getRow()) {
public Boolean call() throws IOException {
try {
GetRequest request = RequestConverter.buildGetRequest(
getLocation().getRegionInfo().getRegionName(), get, true);
GetResponse response = getStub().get(null, request);
return response.getExists();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
}
/**
@ -1105,19 +1132,23 @@ public class HTable implements HTableInterface {
for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
public List<Boolean> call() throws Exception {
return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
.get(0).getRow(), operationTimeout) {
RegionServerCallable<List<Boolean>> callable =
new RegionServerCallable<List<Boolean>>(connection, getTableName(),
getsByRegionEntry.getValue().get(0).getRow()) {
public List<Boolean> call() throws IOException {
try {
MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
.getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
MultiGetResponse responses = stub.multiGet(null, requests);
MultiGetRequest requests = RequestConverter.buildMultiGetRequest(
getLocation().getRegionInfo().getRegionName(), getsByRegionEntry.getValue(),
true, false);
MultiGetResponse responses = getStub().multiGet(null, requests);
return responses.getExistsList();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}.withRetries();
};
return new RpcRetryingCaller<List<Boolean>>().
callWithRetries(callable, getConfiguration(), operationTimeout);
}
};
futures.put(getsByRegionEntry.getKey(), pool.submit(callable));

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.HRegionLocation;
@ -36,19 +35,22 @@ import com.google.protobuf.ServiceException;
/**
* Callable that handles the <code>multi</code> method call going against a single
* regionserver; i.e. A {@link ServerCallable} for the multi call (It is not a
* {@link Callable} that goes against multiple regions.
* regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
* {@link RegionServerCallable} that goes against multiple regions.
* @param <R>
*/
class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
private final MultiAction<R> multi;
private final HRegionLocation loc;
MultiServerCallable(final HConnection connection, final byte [] tableName,
final HRegionLocation loc, final MultiAction<R> multi) {
final HRegionLocation location, final MultiAction<R> multi) {
super(connection, tableName, null);
this.multi = multi;
this.loc = loc;
setLocation(location);
}
MultiAction<R> getMulti() {
return this.multi;
}
@Override
@ -74,7 +76,7 @@ class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
// Carry the cells over the proxy/pb Service interface using the payload carrying
// rpc controller.
stub.multi(new PayloadCarryingRpcController(cells), multiRequest);
getStub().multi(new PayloadCarryingRpcController(cells), multiRequest);
// This multi call does not return results.
response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
} catch (ServiceException se) {
@ -99,7 +101,7 @@ class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
ClientProtos.MultiResponse responseProto = stub.multi(controller, multiRequest);
ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest);
results = ResponseConverter.getResults(responseProto, controller.cellScanner());
} catch (ServiceException se) {
ex = ProtobufUtil.getRemoteException(se);
@ -115,6 +117,7 @@ class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
@Override
public void prepare(boolean reload) throws IOException {
stub = connection.getClient(loc.getServerName());
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(getLocation().getServerName()));
}
}
}

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.exceptions.RegionException;
/** Thrown when a table can not be located */
@InterfaceAudience.Public
@InterfaceStability.Stable
@InterfaceStability.Evolving
public class RegionOfflineException extends RegionException {
private static final long serialVersionUID = 466008402L;
/** default constructor */
@ -36,4 +36,4 @@ public class RegionOfflineException extends RegionException {
public RegionOfflineException(String s) {
super(s);
}
}
}

View File

@ -0,0 +1,145 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Implementations call a RegionServer and implement {@link #call()}.
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
// Public because used outside of this package over in ipc.
static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
private final HConnection connection;
private final byte [] tableName;
private final byte [] row;
private HRegionLocation location;
private ClientService.BlockingInterface stub;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
/**
* @param connection Connection to use.
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(HConnection connection, byte [] tableName, byte [] row) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
}
/**
* Prepare for connection to the server hosting region with row from tablename. Does lookup
* to find region location and hosting server.
* @param reload Set this to true if connection should re-find the region
* @throws IOException e
*/
public void prepare(final boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload);
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + Bytes.toString(tableName) +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
setStub(getConnection().getClient(getLocation().getServerName()));
}
/**
* @return {@link HConnection} instance used by this Callable.
*/
HConnection getConnection() {
return this.connection;
}
protected ClientService.BlockingInterface getStub() {
return this.stub;
}
void setStub(final ClientService.BlockingInterface stub) {
this.stub = stub;
}
protected HRegionLocation getLocation() {
return this.location;
}
protected void setLocation(final HRegionLocation location) {
this.location = location;
}
public byte [] getTableName() {
return this.tableName;
}
public byte [] getRow() {
return this.row;
}
@Override
public void throwable(Throwable t, boolean retrying) {
if (t instanceof SocketTimeoutException ||
t instanceof ConnectException ||
t instanceof RetriesExhaustedException ||
(location != null && getConnection().isDeadServer(location.getServerName()))) {
// if thrown these exceptions, we clear all the cache entries that
// map to that slow/dead server; otherwise, let cache miss and ask
// .META. again to find the new location
getConnection().clearCaches(location.getServerName());
} else if (t instanceof RegionMovedException) {
getConnection().updateCachedLocations(tableName, row, t, location);
} else if (t instanceof NotServingRegionException && !retrying) {
// Purge cache entries for this specific region from META cache
// since we don't call connect(true) when number of retries is 1.
getConnection().deleteCachedRegionLocation(location);
}
}
@Override
public String getExceptionMessageAdditionalDetail() {
return "row '" + Bytes.toString(row) + "' on table '" + Bytes.toString(tableName);
}
@Override
public long sleep(long pause, int tries) {
// Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
if (sleep < MIN_WAIT_DEAD_SERVER
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
}
return sleep;
}
}

View File

@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Exception thrown by HTable methods when an attempt to do something (like
@ -61,7 +62,7 @@ public class RetriesExhaustedException extends IOException {
/**
* Create a new RetriesExhaustedException from the list of prior failures.
* @param callableVitals Details from the {@link ServerCallable} we were using
* @param callableVitals Details from the Callable we were using
* when we got this exception.
* @param numTries The number of tries we made
* @param exceptions List of exceptions that failed before giving up

View File

@ -0,0 +1,66 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A Callable<T> that will be retried. If {@link #call()} invocation throws exceptions,
* we will call {@link #throwable(Throwable, boolean)} with whatever the exception was.
* @param <T>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RetryingCallable<T> extends Callable<T> {
/**
* Prepare by setting up any connections to servers, etc., ahead of {@link #call()} invocation.
* @param reload Set this to true if need to requery locations (usually set on second invocation
* to {@link #call()} or whatever
* @throws IOException e
*/
void prepare(final boolean reload) throws IOException;
/**
* Called when {@link #call()} throws an exception and we are going to retry; take action to
* make it so we succeed on next call (clear caches, do relookup of locations, etc.).
* @param t
* @param retrying True if we are in retrying mode (we are not in retrying mode when max
* retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt)
*/
void throwable(final Throwable t, boolean retrying);
/**
* @return Some details from the implementation that we would like to add to a terminating
* exception; i.e. a fatal exception is being thrown ending retries and we might like to add
* more implementation-specific detail on to the exception being thrown.
*/
String getExceptionMessageAdditionalDetail();
/**
* @param pause
* @param tries
* @return Suggestion on how much to sleep between retries
*/
long sleep(final long pause, final int tries);
}

View File

@ -0,0 +1,228 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException;
/**
* Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
* threadlocal outstanding timeouts as so we don't persist too much.
* Dynamic rather than static so can set the generic appropriately.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RpcRetryingCaller<T> {
static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
/**
* Timeout for the call including retries
*/
private int callTimeout;
/**
* When we started making calls.
*/
private long globalStartTime;
/**
* Start and end times for a single call.
*/
private long startTime, endTime;
private final static int MIN_RPC_TIMEOUT = 2000;
public RpcRetryingCaller() {
super();
}
private void beforeCall() {
this.startTime = EnvironmentEdgeManager.currentTimeMillis();
int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
if (remaining < MIN_RPC_TIMEOUT) {
// If there is no time left, we're trying anyway. It's too late.
// 0 means no timeout, and it's not the intent here. So we secure both cases by
// resetting to the minimum.
remaining = MIN_RPC_TIMEOUT;
}
RpcClient.setRpcTimeout(remaining);
}
private void afterCall() {
RpcClient.resetRpcTimeout();
this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration conf)
throws IOException, RuntimeException {
return callWithRetries(callable, conf, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration conf,
final int callTimeout)
throws IOException, RuntimeException {
final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
final int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
return callWithRetries(callable, callTimeout, pause, numRetries);
}
/**
* Retries if invocation fails.
* @param conf
* @param callTimeout Timeout for this call
* @param callable The {@link RetryingCallable} to run.
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout, final long pause,
final int retries)
throws IOException, RuntimeException {
this.callTimeout = callTimeout;
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0;; tries++) {
long expectedSleep = 0;
try {
beforeCall();
callable.prepare(tries != 0); // if called with false, check table status on ZK
return callable.call();
} catch (Throwable t) {
LOG.warn("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
(this.globalStartTime - System.currentTimeMillis()) + "ms", t);
// translateException throws exception when should not retry: i.e. when request is bad.
t = translateException(t);
callable.throwable(t, retries != 1);
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
// If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
expectedSleep = callable.sleep(pause, tries + 1);
// If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep);
if (duration > this.callTimeout) {
String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration +
": " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
}
} finally {
afterCall();
}
try {
Thread.sleep(expectedSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
}
}
}
/**
* @param expectedSleep
* @return Calculate how long a single call took
*/
private long singleCallDuration(final long expectedSleep) {
return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep;
}
/**
* Call the server once only.
* {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you
* want to do a single call only (A call to {@link RetryingCallable#call()} will not likely
* succeed).
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
public T callWithoutRetries(RetryingCallable<T> callable)
throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
beforeCall();
callable.prepare(false);
return callable.call();
} catch (Throwable t) {
Throwable t2 = translateException(t);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) {
throw (IOException)t2;
} else {
throw new RuntimeException(t2);
}
} finally {
afterCall();
}
}
/**
* Get the good or the remote exception if any, throws the DoNotRetryIOException.
* @param t the throwable to analyze
* @return the translated exception, if it's not a DoNotRetryIOException
* @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
*/
static Throwable translateException(Throwable t) throws DoNotRetryIOException {
if (t instanceof UndeclaredThrowableException) {
if (t.getCause() != null) {
t = t.getCause();
}
}
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof ServiceException) {
ServiceException se = (ServiceException)t;
Throwable cause = se.getCause();
if (cause != null && cause instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)cause;
}
// Don't let ServiceException out; its rpc specific.
t = cause;
// t could be a RemoteException so go aaround again.
translateException(t);
} else if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)t;
}
return t;
}
}

View File

@ -49,12 +49,13 @@ import java.io.IOException;
import java.net.UnknownHostException;
/**
* Retries scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link HTable}.
* Scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
* {@link RpcRetryingCaller} so fails are retried.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ScannerCallable extends ServerCallable<Result[]> {
public class ScannerCallable extends RegionServerCallable<Result[]> {
public static final String LOG_SCANNER_LATENCY_CUTOFF
= "hbase.client.log.scanner.latency.cutoff";
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
@ -126,7 +127,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
* to decide if hbase client connects to a remote region server
*/
private void checkIfRegionServerIsRemote() {
if (this.location.getHostname().equalsIgnoreCase(myAddress)) {
if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
isRegionServerRemote = false;
} else {
isRegionServerRemote = true;
@ -154,7 +155,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
response = stub.scan(controller, request);
response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this
// number along with the request and at RS side both the incoming nextCallSeq and its
@ -198,7 +199,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
try {
HRegionLocation location =
connection.relocateRegion(tableName, scan.getStartRow());
getConnection().relocateRegion(getTableName(), scan.getStartRow());
LOG.info("Scanner=" + scannerId
+ " expired, current region location is " + location.toString()
+ " ip:" + location.getHostnamePort());
@ -270,7 +271,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true);
try {
stub.scan(null, request);
getStub().scan(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@ -284,15 +285,15 @@ public class ScannerCallable extends ServerCallable<Result[]> {
incRPCcallsMetrics();
ScanRequest request =
RequestConverter.buildScanRequest(
this.location.getRegionInfo().getRegionName(),
getLocation().getRegionInfo().getRegionName(),
this.scan, 0, false);
try {
ScanResponse response = stub.scan(null, request);
ScanResponse response = getStub().scan(null, request);
long id = response.getScannerId();
if (logScannerActivity) {
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
+ " on region " + this.location.toString() + " ip:"
+ this.location.getHostnamePort());
+ " on region " + getLocation().toString() + " ip:"
+ getLocation().getHostnamePort());
}
return id;
} catch (ServiceException se) {
@ -318,7 +319,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
if (!instantiated) {
return null;
}
return location.getRegionInfo();
return getLocation().getRegionInfo();
}
/**
@ -336,4 +337,4 @@ public class ScannerCallable extends ServerCallable<Result[]> {
public void setCaching(int caching) {
this.caching = caching;
}
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
class ScannerCaller extends RpcRetryingCaller<Result []> {}

View File

@ -1,299 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Abstract class that implements {@link Callable}. Implementation stipulates
* return type and method we actually invoke on remote Server. Usually
* used inside a try/catch that fields usual connection failures all wrapped
* up in a retry loop.
* <p>Call {@link #prepare(boolean)} to connect to server hosting region
* that contains the passed row in the passed table before invoking
* {@link #call()}.
* @see HConnection#getRegionServerWithoutRetries(ServerCallable)
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ServerCallable<T> implements Callable<T> {
static final Log LOG = LogFactory.getLog(ServerCallable.class);
protected final HConnection connection;
protected final byte [] tableName;
protected final byte [] row;
protected HRegionLocation location;
protected ClientService.BlockingInterface stub;
protected int callTimeout;
protected long globalStartTime;
protected long startTime, endTime;
protected final static int MIN_RPC_TIMEOUT = 2000;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
/**
* @param connection Connection to use.
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
this.callTimeout = callTimeout;
}
/**
* Prepare for connection to the server hosting region with row from tablename. Does lookup
* to find region location and hosting server.
* @param reload Set this to true if connection should re-find the region
* @throws IOException e
*/
public void prepare(final boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload);
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + Bytes.toString(tableName) + ", row=" +
Bytes.toString(row) + ", reload=" + reload);
}
this.stub = connection.getClient(location.getServerName());
}
/** @return the server name
* @deprecated Just use {@link #toString()} instead.
*/
public String getServerName() {
if (location == null) return null;
return location.getHostnamePort();
}
/** @return the region name
* @deprecated Just use {@link #toString()} instead.
*/
public byte[] getRegionName() {
if (location == null) return null;
return location.getRegionInfo().getRegionName();
}
/** @return the row
* @deprecated Just use {@link #toString()} instead.
*/
public byte [] getRow() {
return row;
}
public void beforeCall() {
this.startTime = EnvironmentEdgeManager.currentTimeMillis();
int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
if (remaining < MIN_RPC_TIMEOUT) {
// If there is no time left, we're trying anyway. It's too late.
// 0 means no timeout, and it's not the intent here. So we secure both cases by
// resetting to the minimum.
remaining = MIN_RPC_TIMEOUT;
}
RpcClient.setRpcTimeout(remaining);
}
public void afterCall() {
RpcClient.resetRpcTimeout();
this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
/**
* @return {@link HConnection} instance used by this Callable.
*/
HConnection getConnection() {
return this.connection;
}
/**
* Run this instance with retries, timed waits,
* and refinds of missing regions.
*
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
public T withRetries()
throws IOException, RuntimeException {
Configuration c = getConnection().getConfiguration();
final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0;; tries++) {
long expectedSleep = 0;
try {
beforeCall();
prepare(tries != 0); // if called with false, check table status on ZK
return call();
} catch (Throwable t) {
LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ", retryTime=" +
(this.globalStartTime - System.currentTimeMillis()) + "ms", t);
t = translateException(t);
// translateException throws an exception when we should not retry, i.e. when it's the
// request that is bad.
if (t instanceof SocketTimeoutException ||
t instanceof ConnectException ||
t instanceof RetriesExhaustedException ||
(location != null && getConnection().isDeadServer(location.getServerName()))) {
// if thrown these exceptions, we clear all the cache entries that
// map to that slow/dead server; otherwise, let cache miss and ask
// .META. again to find the new location
getConnection().clearCaches(location.getServerName());
} else if (t instanceof RegionMovedException) {
getConnection().updateCachedLocations(tableName, row, t, location);
} else if (t instanceof NotServingRegionException && numRetries == 1) {
// Purge cache entries for this specific region from META cache
// since we don't call connect(true) when number of retries is 1.
getConnection().deleteCachedRegionLocation(location);
}
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
if (tries >= numRetries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
// If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1);
if (expectedSleep < MIN_WAIT_DEAD_SERVER
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
}
// If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep);
if (duration > this.callTimeout) {
throw (SocketTimeoutException) new SocketTimeoutException(
"Call to access row '" + Bytes.toString(row) + "' on table '"
+ Bytes.toString(tableName)
+ "' failed on timeout. " + " callTimeout=" + this.callTimeout +
", callDuration=" + duration).initCause(t);
}
} finally {
afterCall();
}
try {
Thread.sleep(expectedSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted after " + tries + " tries on " + numRetries, e);
}
}
}
/**
* @param expectedSleep
* @return Calculate how long a single call took
*/
private long singleCallDuration(final long expectedSleep) {
return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep;
}
/**
* Run this instance against the server once.
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
public T withoutRetries()
throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
beforeCall();
prepare(false);
return call();
} catch (Throwable t) {
Throwable t2 = translateException(t);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) {
throw (IOException)t2;
} else {
throw new RuntimeException(t2);
}
} finally {
afterCall();
}
}
/**
* Get the good or the remote exception if any, throws the DoNotRetryIOException.
* @param t the throwable to analyze
* @return the translated exception, if it's not a DoNotRetryIOException
* @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
*/
protected static Throwable translateException(Throwable t) throws DoNotRetryIOException {
if (t instanceof UndeclaredThrowableException) {
if(t.getCause() != null) {
t = t.getCause();
}
}
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof ServiceException) {
ServiceException se = (ServiceException)t;
Throwable cause = se.getCause();
if (cause != null && cause instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)cause;
}
} else if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)t;
}
return t;
}
}

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DoNotRetryIOException extends HBaseIOException {
// TODO: This would be more useful as a marker interface than as a class.
private static final long serialVersionUID = 1197446454511704139L;
/**

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
/**
@ -28,7 +27,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HBaseSnapshotException extends HBaseIOException {
public class HBaseSnapshotException extends DoNotRetryIOException {
private SnapshotDescription description;

View File

@ -29,7 +29,7 @@ import java.io.IOException;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidFamilyOperationException extends IOException {
public class InvalidFamilyOperationException extends DoNotRetryIOException {
private static final long serialVersionUID = 1L << 22 - 1L;
/** default constructor */
public InvalidFamilyOperationException() {

View File

@ -19,9 +19,7 @@
*/
package org.apache.hadoop.hbase.exceptions;
import java.io.IOException;
public class LockTimeoutException extends IOException {
public class LockTimeoutException extends DoNotRetryIOException {
private static final long serialVersionUID = -1770764924258999825L;
@ -33,5 +31,4 @@ public class LockTimeoutException extends IOException {
public LockTimeoutException(String s) {
super(s);
}
}
}

View File

@ -43,5 +43,4 @@ public class RegionException extends HBaseIOException {
public RegionException(String s) {
super(s);
}
}
}

View File

@ -17,14 +17,12 @@ package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
/**
* Thrown when a table exists but should not
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableExistsException extends IOException {
public class TableExistsException extends DoNotRetryIOException {
private static final long serialVersionUID = 1L << 7 - 1L;
/** default constructor */
public TableExistsException() {
@ -39,4 +37,4 @@ public class TableExistsException extends IOException {
public TableExistsException(String s) {
super(s);
}
}
}

View File

@ -22,14 +22,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* Thrown if a table should be offline but is not
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableNotDisabledException extends IOException {
public class TableNotDisabledException extends DoNotRetryIOException {
private static final long serialVersionUID = 1L << 19 - 1L;
/** default constructor */
public TableNotDisabledException() {

View File

@ -22,14 +22,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* Thrown if a table should be enabled but is not
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableNotEnabledException extends IOException {
public class TableNotEnabledException extends DoNotRetryIOException {
private static final long serialVersionUID = 262144L;
/** default constructor */
public TableNotEnabledException() {
@ -50,4 +49,4 @@ public class TableNotEnabledException extends IOException {
public TableNotEnabledException(byte[] tableName) {
this(Bytes.toString(tableName));
}
}
}

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
/** Thrown when a table can not be located */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableNotFoundException extends RegionException {
public class TableNotFoundException extends DoNotRetryIOException {
private static final long serialVersionUID = 993179627856392526L;
/** default constructor */
@ -36,4 +36,4 @@ public class TableNotFoundException extends RegionException {
public TableNotFoundException(String s) {
super(s);
}
}
}

View File

@ -24,7 +24,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
@ -75,14 +76,15 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
.setServiceName(method.getService().getFullName())
.setMethodName(method.getName())
.setRequest(request.toByteString()).build();
ServerCallable<CoprocessorServiceResponse> callable =
new ServerCallable<CoprocessorServiceResponse>(connection, table, row) {
RegionServerCallable<CoprocessorServiceResponse> callable =
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
public CoprocessorServiceResponse call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
return ProtobufUtil.execService(stub, call, regionName);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
return ProtobufUtil.execService(getStub(), call, regionName);
}
};
CoprocessorServiceResponse result = callable.withRetries();
CoprocessorServiceResponse result = new RpcRetryingCaller<CoprocessorServiceResponse>().
callWithRetries(callable, this.connection.getConfiguration());
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()

View File

@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Category(MediumTests.class)
public class TestAsyncProcess {
private static final byte[] DUMMY_TABLE = "DUMMY_TABLE".getBytes();
@ -65,42 +64,40 @@ public class TestAsyncProcess {
private static Exception failure = new Exception("failure");
static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
public MyAsyncProcess(HConnection hc,
AsyncProcessCallback<Res> callback, Configuration conf) {
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("test-TestAsyncProcess"))
, callback, conf);
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
callback, conf);
}
/**
* Do not call a server, fails if the rowkey of the operation is{@link #FAILS}
*/
@Override
protected ServerCallable<MultiResponse> createCallable(
final HRegionLocation loc, final MultiAction<Row> multi) {
final MultiResponse mr = new MultiResponse();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
for (Action a : entry.getValue()) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
} else {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success);
}
}
}
return new MultiServerCallable<Row>(hConnection, tableName, null, null) {
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti());
return new RpcRetryingCaller<MultiResponse>() {
@Override
public MultiResponse withoutRetries() {
public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
throws IOException, RuntimeException {
return mr;
}
};
}
}
static MultiResponse createMultiResponse(final HRegionLocation loc,
final MultiAction<Row> multi) {
final MultiResponse mr = new MultiResponse();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
for (Action a : entry.getValue()) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
} else {
mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success);
}
}
}
return mr;
}
/**
* Returns our async process.
*/

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.LinkedBlockingQueue;
@ -129,6 +130,7 @@ public class Threads {
}
/**
* If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
* @param millis How long to sleep for in milliseconds.
*/
public static void sleep(long millis) {
@ -136,6 +138,7 @@ public class Threads {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}

View File

@ -60,7 +60,8 @@ import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
@ -536,23 +537,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
final ServerCallable<Boolean> svrCallable = new ServerCallable<Boolean>(conn,
tableName, first) {
final RegionServerCallable<Boolean> svrCallable =
new RegionServerCallable<Boolean>(conn, tableName, first) {
@Override
public Boolean call() throws Exception {
SecureBulkLoadClient secureClient = null;
boolean success = false;
try {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
byte[] regionName = getLocation().getRegionInfo().getRegionName();
if(!useSecure) {
success = ProtobufUtil.bulkLoadHFile(stub, famPaths, regionName, assignSeqIds);
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
} else {
HTable table = new HTable(conn.getConfiguration(), tableName);
HTable table = new HTable(conn.getConfiguration(), getTableName());
secureClient = new SecureBulkLoadClient(table);
success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken,
getLocation().getRegionInfo().getStartKey());
}
return success;
} finally {
@ -586,7 +588,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
boolean success = svrCallable.withRetries();
boolean success = new RpcRetryingCaller<Boolean>().callWithRetries(svrCallable, getConf());
if (!success) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "

View File

@ -34,15 +34,11 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
@ -166,7 +162,7 @@ public class WALEditsReplaySink {
try {
ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
this.conn, this.tableName, regionLoc, regionInfo, actions);
callable.withRetries();
new RpcRetryingCaller<MultiResponse>().callWithRetries(callable, conf, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
@ -181,19 +177,17 @@ public class WALEditsReplaySink {
* Callable that handles the <code>replay</code> method call going against a single regionserver
* @param <R>
*/
class ReplayServerCallable<R> extends ServerCallable<MultiResponse> {
class ReplayServerCallable<R> extends RegionServerCallable<MultiResponse> {
private HRegionInfo regionInfo;
private List<Action<Row>> actions;
private Map<HRegionLocation, Map<HRegionInfo, List<Action<Row>>>> retryActions = null;
ReplayServerCallable(final HConnection connection, final byte [] tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Action<Row>> actions) {
super(connection, tableName, null, replayTimeout);
super(connection, tableName, null);
this.actions = actions;
this.regionInfo = regionInfo;
this.location = regionLoc;
setLocation(regionLoc);
}
@Override
@ -208,7 +202,7 @@ public class WALEditsReplaySink {
private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions)
throws IOException, ServiceException {
AdminService.BlockingInterface remoteSvr = connection.getAdmin(location.getServerName());
AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(),
actions);
MultiResponse protoResults = remoteSvr.replay(null, request);
@ -235,16 +229,14 @@ public class WALEditsReplaySink {
@Override
public void prepare(boolean reload) throws IOException {
if (!reload) return;
// relocate regions in case we have a new dead server or network hiccup
// if not due to connection issue, the following code should run fast because it uses
// cached location
for (Action<Row> action : actions) {
// use first row to relocate region because all actions are for one region
this.location = this.connection.locateRegion(tableName, action.getAction().getRow());
setLocation(conn.locateRegion(tableName, action.getAction().getRow()));
break;
}
}
}
}
}

View File

@ -43,7 +43,7 @@ public class HConnectionTestingUtility {
* configuration instance. Minimally the mock will return
* <code>conf</conf> when {@link HConnection#getConfiguration()} is invoked.
* Be sure to shutdown the connection when done by calling
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
* {@link HConnectionManager#deleteConnection(Configuration)} else it
* will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for <code>conf</code>
@ -69,7 +69,7 @@ public class HConnectionTestingUtility {
* more of the popular {@link HConnection} methods so they do 'normal'
* operation (see return doc below for list). Be sure to shutdown the
* connection when done by calling
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
* {@link HConnectionManager#deleteConnection(Configuration)} else it
* will stick around; this is probably not what you want.
*
* @param conf Configuration to use
@ -88,7 +88,7 @@ public class HConnectionTestingUtility {
* {@link HConnection#getAdmin(ServerName)} is called, returns the passed
* {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link HConnection#getClient(ServerName)} is called (Be sure to call
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)}
* {@link HConnectionManager#deleteConnection(Configuration)}
* when done with this mocked Connection.
* @throws IOException
*/
@ -123,7 +123,7 @@ public class HConnectionTestingUtility {
* Get a Mockito spied-upon {@link HConnection} that goes with the passed
* <code>conf</code> configuration instance.
* Be sure to shutdown the connection when done by calling
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
* {@link HConnectionManager#deleteConnection(Configuration)} else it
* will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for <code>conf</code>

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mortbay.log.Log;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@Category(SmallTests.class)
public class TestHBaseAdminNoCluster {
/**
* Verify that PleaseHoldException gets retried.
* HBASE-8764
* @throws IOException
* @throws ZooKeeperConnectionException
* @throws MasterNotRunningException
* @throws ServiceException
*/
@Test
public void testMasterMonitorCollableRetries()
throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException {
Configuration configuration = HBaseConfiguration.create();
// Set the pause and retry count way down.
configuration.setLong(HConstants.HBASE_CLIENT_PAUSE, 1);
final int count = 10;
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count);
// Get mocked connection. Getting the connection will register it so when HBaseAdmin is
// constructed with same configuration, it will find this mocked connection.
HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration);
// Mock so we get back the master interface. Make it so when createTable is called, we throw
// the PleaseHoldException.
MasterAdminKeepAliveConnection masterAdmin =
Mockito.mock(MasterAdminKeepAliveConnection.class);
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
(MasterAdminProtos.CreateTableRequest)Mockito.any())).
thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test")));
Mockito.when(connection.getKeepAliveMasterAdminService()).thenReturn(masterAdmin);
// Mock up our admin Interfaces
HBaseAdmin admin = new HBaseAdmin(configuration);
try {
HTableDescriptor htd = new HTableDescriptor("testMasterMonitorCollableRetries");
// Pass any old htable descriptor; not important
try {
admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
fail();
} catch (RetriesExhaustedException e) {
Log.info("Expected fail", e);
}
// Assert we were called 'count' times.
Mockito.verify(masterAdmin, Mockito.atLeast(count)).createTable((RpcController)Mockito.any(),
(MasterAdminProtos.CreateTableRequest)Mockito.any());
} finally {
admin.close();
if (connection != null)HConnectionManager.deleteConnection(configuration);
}
}
}

View File

@ -32,10 +32,11 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.exceptions.TableExistsException;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -141,41 +142,43 @@ public class TestHRegionServerBulkLoad {
}
// bulk load HFiles
HConnection conn = UTIL.getHBaseAdmin().getConnection();
final HConnection conn = UTIL.getHBaseAdmin().getConnection();
byte[] tbl = Bytes.toBytes(tableName);
new ServerCallable<Void>(conn, tbl, Bytes
.toBytes("aaa")) {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
@Override
public Void call() throws Exception {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
byte[] regionName = getLocation().getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
stub.bulkLoadHFile(null, request);
getStub().bulkLoadHFile(null, request);
return null;
}
}.withRetries();
};
RpcRetryingCaller<Void> caller = new RpcRetryingCaller<Void>();
caller.callWithRetries(callable, UTIL.getConfiguration());
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
// 10 * 50 = 500 open file handles!
new ServerCallable<Void>(conn, tbl,
Bytes.toBytes("aaa")) {
callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
@Override
public Void call() throws Exception {
LOG.debug("compacting " + location + " for row "
+ Bytes.toStringBinary(row));
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =
connection.getAdmin(location.getServerName());
conn.getAdmin(getLocation().getServerName());
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(
location.getRegionInfo().getRegionName(), true, null);
getLocation().getRegionInfo().getRegionName(), true, null);
server.compactRegion(null, request);
numCompactions.incrementAndGet();
return null;
}
}.withRetries();
};
caller.callWithRetries(callable, UTIL.getConfiguration());
}
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.TreeMap;
import java.util.List;
import java.util.Map;
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
@ -377,7 +379,7 @@ public class TestHLog {
* [FSNamesystem.nextGenerationStampForBlock])
* 3. HDFS-142 (on restart, maintain pendingCreates)
*/
@Test
@Test (timeout=300000)
public void testAppendClose() throws Exception {
byte [] tableName = Bytes.toBytes(getName());
HRegionInfo regioninfo = new HRegionInfo(tableName,
@ -422,16 +424,16 @@ public class TestHLog {
Thread.sleep(1000);
}
assertFalse(cluster.isClusterUp());
// Workaround a strange issue with Hadoop's RPC system - if we don't
// sleep here, the new datanodes will pick up a cached IPC connection to
// the old (dead) NN and fail to start. Sleeping 2 seconds goes past
// the idle time threshold configured in the conf above
Thread.sleep(2000);
LOG.info("Waiting a few seconds before re-starting HDFS");
Thread.sleep(5000);
cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
cluster = null;
for (int i = 0; i < 100; i++) {
try {
cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
break;
} catch (BindException e) {
LOG.info("Sleeping. BindException bringing up new cluster");
Threads.sleep(1000);
}
}
cluster.waitActive();
fs = cluster.getFileSystem();
LOG.info("STARTED second instance.");