HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx'
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
50a0e4f83e
commit
a24ef97073
|
@ -22,14 +22,15 @@ import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
|
||||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.ServerListener;
|
import org.apache.hadoop.hbase.master.ServerListener;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -37,21 +38,18 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,8 +65,6 @@ public class RSProcedureDispatcher
|
||||||
"hbase.regionserver.rpc.startup.waittime";
|
"hbase.regionserver.rpc.startup.waittime";
|
||||||
private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
|
private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
|
||||||
|
|
||||||
private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
|
|
||||||
|
|
||||||
protected final MasterServices master;
|
protected final MasterServices master;
|
||||||
private final long rsStartupWaitTime;
|
private final long rsStartupWaitTime;
|
||||||
private MasterProcedureEnv procedureEnv;
|
private MasterProcedureEnv procedureEnv;
|
||||||
|
@ -119,18 +115,11 @@ public class RSProcedureDispatcher
|
||||||
@Override
|
@Override
|
||||||
protected void remoteDispatch(final ServerName serverName,
|
protected void remoteDispatch(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> remoteProcedures) {
|
final Set<RemoteProcedure> remoteProcedures) {
|
||||||
final int rsVersion = master.getServerManager().getVersionNumber(serverName);
|
if (!master.getServerManager().isServerOnline(serverName)) {
|
||||||
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
|
// fail fast
|
||||||
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
|
|
||||||
rsVersion);
|
|
||||||
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
|
|
||||||
} else if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
|
|
||||||
submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
|
submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
|
||||||
} else {
|
} else {
|
||||||
LOG.info(String.format(
|
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
|
||||||
"Fallback to compat rpc execution for serverName=%s version=%s",
|
|
||||||
serverName, rsVersion));
|
|
||||||
submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,90 +143,6 @@ public class RSProcedureDispatcher
|
||||||
removeNode(serverName);
|
removeNode(serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Base remote call
|
|
||||||
*/
|
|
||||||
protected abstract class AbstractRSRemoteCall implements Runnable {
|
|
||||||
|
|
||||||
private final ServerName serverName;
|
|
||||||
|
|
||||||
private int numberOfAttemptsSoFar = 0;
|
|
||||||
private long maxWaitTime = -1;
|
|
||||||
|
|
||||||
public AbstractRSRemoteCall(final ServerName serverName) {
|
|
||||||
this.serverName = serverName;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected AdminService.BlockingInterface getRsAdmin() throws IOException {
|
|
||||||
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
|
|
||||||
if (admin == null) {
|
|
||||||
throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
|
|
||||||
" failed because no RPC connection found to this server");
|
|
||||||
}
|
|
||||||
return admin;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ServerName getServerName() {
|
|
||||||
return serverName;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean scheduleForRetry(final IOException e) {
|
|
||||||
// Should we wait a little before retrying? If the server is starting it's yes.
|
|
||||||
final boolean hold = (e instanceof ServerNotRunningYetException);
|
|
||||||
if (hold) {
|
|
||||||
LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
|
|
||||||
serverName, numberOfAttemptsSoFar), e);
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
if (now < getMaxWaitTime()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(String.format("server is not yet up; waiting up to %dms",
|
|
||||||
(getMaxWaitTime() - now)), e);
|
|
||||||
}
|
|
||||||
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case it is a connection exception and the region server is still online,
|
|
||||||
// the openRegion RPC could have been accepted by the server and
|
|
||||||
// just the response didn't go through. So we will retry to
|
|
||||||
// open the region on the same server.
|
|
||||||
final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
|
|
||||||
&& master.getServerManager().isServerOnline(serverName));
|
|
||||||
if (retry) {
|
|
||||||
// we want to retry as many times as needed as long as the RS is not dead.
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
|
|
||||||
serverName, e.getMessage()), e);
|
|
||||||
}
|
|
||||||
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// trying to send the request elsewhere instead
|
|
||||||
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
|
||||||
serverName, numberOfAttemptsSoFar), e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long getMaxWaitTime() {
|
|
||||||
if (this.maxWaitTime < 0) {
|
|
||||||
// This is the max attempts, not retries, so it should be at least 1.
|
|
||||||
this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
|
|
||||||
}
|
|
||||||
return this.maxWaitTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected IOException unwrapException(IOException e) {
|
|
||||||
if (e instanceof RemoteException) {
|
|
||||||
e = ((RemoteException)e).unwrapRemoteException();
|
|
||||||
}
|
|
||||||
return e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private interface RemoteProcedureResolver {
|
private interface RemoteProcedureResolver {
|
||||||
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
|
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
|
||||||
|
|
||||||
|
@ -297,18 +202,106 @@ public class RSProcedureDispatcher
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Compatibility calls
|
// Compatibility calls
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
|
protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
|
||||||
implements RemoteProcedureResolver {
|
|
||||||
protected final Set<RemoteProcedure> remoteProcedures;
|
private final ServerName serverName;
|
||||||
|
|
||||||
|
private final Set<RemoteProcedure> remoteProcedures;
|
||||||
|
|
||||||
|
private int numberOfAttemptsSoFar = 0;
|
||||||
|
private long maxWaitTime = -1;
|
||||||
|
|
||||||
private ExecuteProceduresRequest.Builder request = null;
|
private ExecuteProceduresRequest.Builder request = null;
|
||||||
|
|
||||||
public ExecuteProceduresRemoteCall(final ServerName serverName,
|
public ExecuteProceduresRemoteCall(final ServerName serverName,
|
||||||
final Set<RemoteProcedure> remoteProcedures) {
|
final Set<RemoteProcedure> remoteProcedures) {
|
||||||
super(serverName);
|
this.serverName = serverName;
|
||||||
this.remoteProcedures = remoteProcedures;
|
this.remoteProcedures = remoteProcedures;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AdminService.BlockingInterface getRsAdmin() throws IOException {
|
||||||
|
final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
|
||||||
|
if (admin == null) {
|
||||||
|
throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
|
||||||
|
" failed because no RPC connection found to this server");
|
||||||
|
}
|
||||||
|
return admin;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final ServerName getServerName() {
|
||||||
|
return serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean scheduleForRetry(IOException e) {
|
||||||
|
LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
|
||||||
|
// Should we wait a little before retrying? If the server is starting it's yes.
|
||||||
|
if (e instanceof ServerNotRunningYetException) {
|
||||||
|
long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
|
||||||
|
if (remainingTime > 0) {
|
||||||
|
LOG.warn("waiting a little before trying on the same server={}," +
|
||||||
|
" try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
|
||||||
|
numberOfAttemptsSoFar++;
|
||||||
|
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
LOG.warn("server {} is not up for a while; try a new one", serverName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (e instanceof DoNotRetryIOException) {
|
||||||
|
LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName,
|
||||||
|
e.toString(), numberOfAttemptsSoFar);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// this exception is thrown in the rpc framework, where we can make sure that the call has not
|
||||||
|
// been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
|
||||||
|
// better choose another region server
|
||||||
|
// notice that, it is safe to quit only if this is the first time we send request to region
|
||||||
|
// server. Maybe the region server has accept our request the first time, and then there is a
|
||||||
|
// network error which prevents we receive the response, and the second time we hit a
|
||||||
|
// CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
|
||||||
|
// double assign...
|
||||||
|
if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
|
||||||
|
LOG.warn("request to {} failed due to {}, try={}, this usually because" +
|
||||||
|
" server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Always retry for other exception types if the region server is not dead yet.
|
||||||
|
if (!master.getServerManager().isServerOnline(serverName)) {
|
||||||
|
LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up",
|
||||||
|
serverName, e.toString(), numberOfAttemptsSoFar);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
|
||||||
|
// A better way is to return true here to let the upper layer quit, and then schedule a
|
||||||
|
// background task to check whether the region server is dead. And if it is dead, call
|
||||||
|
// remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
|
||||||
|
// result, but waste some resources.
|
||||||
|
LOG.warn("server {} is aborted or stopped, for safety we still need to" +
|
||||||
|
" wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
|
||||||
|
} else {
|
||||||
|
LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName,
|
||||||
|
e.toString(), numberOfAttemptsSoFar);
|
||||||
|
}
|
||||||
|
numberOfAttemptsSoFar++;
|
||||||
|
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getMaxWaitTime() {
|
||||||
|
if (this.maxWaitTime < 0) {
|
||||||
|
// This is the max attempts, not retries, so it should be at least 1.
|
||||||
|
this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
|
||||||
|
}
|
||||||
|
return this.maxWaitTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private IOException unwrapException(IOException e) {
|
||||||
|
if (e instanceof RemoteException) {
|
||||||
|
e = ((RemoteException)e).unwrapRemoteException();
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
request = ExecuteProceduresRequest.newBuilder();
|
request = ExecuteProceduresRequest.newBuilder();
|
||||||
|
@ -348,6 +341,8 @@ public class RSProcedureDispatcher
|
||||||
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
|
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// will be overridden in test.
|
||||||
|
@VisibleForTesting
|
||||||
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
|
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
|
||||||
final ExecuteProceduresRequest request) throws IOException {
|
final ExecuteProceduresRequest request) throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -357,7 +352,7 @@ public class RSProcedureDispatcher
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
||||||
for (RemoteProcedure proc : remoteProcedures) {
|
for (RemoteProcedure proc : remoteProcedures) {
|
||||||
proc.remoteCallFailed(env, getServerName(), e);
|
proc.remoteCallFailed(env, getServerName(), e);
|
||||||
}
|
}
|
||||||
|
@ -375,144 +370,6 @@ public class RSProcedureDispatcher
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
|
||||||
// Compatibility calls
|
|
||||||
// Since we don't have a "batch proc-exec" request on the target RS
|
|
||||||
// we have to chunk the requests by type and dispatch the specific request.
|
|
||||||
// ==========================================================================
|
|
||||||
/**
|
|
||||||
* Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
|
|
||||||
* {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
|
|
||||||
*/
|
|
||||||
private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
|
|
||||||
private final List<RegionOpenOperation> operations;
|
|
||||||
|
|
||||||
public OpenRegionRemoteCall(final ServerName serverName,
|
|
||||||
final List<RegionOpenOperation> operations) {
|
|
||||||
super(serverName);
|
|
||||||
this.operations = operations;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
final OpenRegionRequest request =
|
|
||||||
buildOpenRegionRequest(procedureEnv, getServerName(), operations);
|
|
||||||
|
|
||||||
try {
|
|
||||||
sendRequest(getServerName(), request);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = unwrapException(e);
|
|
||||||
// TODO: In the future some operation may want to bail out early.
|
|
||||||
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
|
||||||
if (!scheduleForRetry(e)) {
|
|
||||||
remoteCallFailed(procedureEnv, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private OpenRegionResponse sendRequest(final ServerName serverName,
|
|
||||||
final OpenRegionRequest request) throws IOException {
|
|
||||||
try {
|
|
||||||
return getRsAdmin().openRegion(null, request);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
|
||||||
for (RegionOpenOperation op: operations) {
|
|
||||||
op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
|
|
||||||
* {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
|
|
||||||
*/
|
|
||||||
private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
|
|
||||||
private final RegionCloseOperation operation;
|
|
||||||
|
|
||||||
public CloseRegionRemoteCall(final ServerName serverName,
|
|
||||||
final RegionCloseOperation operation) {
|
|
||||||
super(serverName);
|
|
||||||
this.operation = operation;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
|
|
||||||
try {
|
|
||||||
CloseRegionResponse response = sendRequest(getServerName(), request);
|
|
||||||
remoteCallCompleted(procedureEnv, response);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = unwrapException(e);
|
|
||||||
// TODO: In the future some operation may want to bail out early.
|
|
||||||
// TODO: How many times should we retry (use numberOfAttemptsSoFar)
|
|
||||||
if (!scheduleForRetry(e)) {
|
|
||||||
remoteCallFailed(procedureEnv, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private CloseRegionResponse sendRequest(final ServerName serverName,
|
|
||||||
final CloseRegionRequest request) throws IOException {
|
|
||||||
try {
|
|
||||||
return getRsAdmin().closeRegion(null, request);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void remoteCallCompleted(final MasterProcedureEnv env,
|
|
||||||
final CloseRegionResponse response) {
|
|
||||||
operation.setClosed(response.getClosed());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
|
|
||||||
operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in
|
|
||||||
* {@link AdminService}.
|
|
||||||
*/
|
|
||||||
protected class CompatRemoteProcedureResolver implements Runnable, RemoteProcedureResolver {
|
|
||||||
private final Set<RemoteProcedure> operations;
|
|
||||||
private final ServerName serverName;
|
|
||||||
|
|
||||||
public CompatRemoteProcedureResolver(final ServerName serverName,
|
|
||||||
final Set<RemoteProcedure> operations) {
|
|
||||||
this.serverName = serverName;
|
|
||||||
this.operations = operations;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
splitAndResolveOperation(serverName, operations, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void dispatchOpenRequests(final MasterProcedureEnv env,
|
|
||||||
final List<RegionOpenOperation> operations) {
|
|
||||||
submitTask(new OpenRegionRemoteCall(serverName, operations));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void dispatchCloseRequests(final MasterProcedureEnv env,
|
|
||||||
final List<RegionCloseOperation> operations) {
|
|
||||||
for (RegionCloseOperation op: operations) {
|
|
||||||
submitTask(new CloseRegionRemoteCall(serverName, op));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// RPC Messages
|
// RPC Messages
|
||||||
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
|
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -75,32 +75,6 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase {
|
||||||
// Assign the region (without problems)
|
// Assign the region (without problems)
|
||||||
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
|
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
|
||||||
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
||||||
|
|
||||||
// TODO: Currently unassign just keeps trying until it sees a server crash.
|
|
||||||
// There is no count on unassign.
|
|
||||||
/*
|
|
||||||
* // Test Unassign operation failure rsDispatcher.setMockRsExecutor(executor);
|
|
||||||
* waitOnFuture(submitProcedure(createUnassignProcedure(hri)));
|
|
||||||
* assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount());
|
|
||||||
* assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
|
|
||||||
* assertEquals(unassignSubmittedCount + 1,
|
|
||||||
* unassignProcMetrics.getSubmittedCounter().getCount()); // TODO: We supposed to have 1 failed
|
|
||||||
* assign, 1 successful assign and a failed unassign // operation. But ProcV2 framework marks
|
|
||||||
* aborted unassign operation as success. Fix it! assertEquals(unassignFailedCount,
|
|
||||||
* unassignProcMetrics.getFailedCounter().getCount());
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIOExceptionOnAssignment() throws Exception {
|
|
||||||
// collect AM metrics before test
|
|
||||||
collectAssignmentManagerMetrics();
|
|
||||||
|
|
||||||
testFailedOpen(TableName.valueOf("testExceptionOnAssignment"),
|
|
||||||
new FaultyRsExecutor(new IOException("test fault")));
|
|
||||||
|
|
||||||
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
|
||||||
assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -131,4 +105,16 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase {
|
||||||
assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
|
assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCallQueueTooBigExceptionOnAssignment() throws Exception {
|
||||||
|
// collect AM metrics before test
|
||||||
|
collectAssignmentManagerMetrics();
|
||||||
|
|
||||||
|
testFailedOpen(TableName.valueOf("testCallQueueTooBigExceptionOnAssignment"),
|
||||||
|
new FaultyRsExecutor(new CallQueueTooBigException("test do not retry fault")));
|
||||||
|
|
||||||
|
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||||
|
assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -82,27 +81,53 @@ public class TestAssignmentManager extends TestAssignmentManagerBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disabled for now. Since HBASE-18551, this mock is insufficient.
|
|
||||||
@Ignore
|
|
||||||
@Test
|
@Test
|
||||||
public void testSocketTimeout() throws Exception {
|
public void testAssignSocketTimeout() throws Exception {
|
||||||
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||||
RegionInfo hri = createRegionInfo(tableName, 1);
|
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||||
|
|
||||||
// collect AM metrics before test
|
// collect AM metrics before test
|
||||||
collectAssignmentManagerMetrics();
|
collectAssignmentManagerMetrics();
|
||||||
|
|
||||||
rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3));
|
rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20));
|
||||||
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
||||||
|
|
||||||
rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 1));
|
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||||
// exception.expect(ServerCrashException.class);
|
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssignQueueFullOnce() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||||
|
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||||
|
|
||||||
|
// collect AM metrics before test
|
||||||
|
collectAssignmentManagerMetrics();
|
||||||
|
|
||||||
|
rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor());
|
||||||
|
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
||||||
|
|
||||||
|
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||||
|
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutThenQueueFull() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||||
|
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||||
|
|
||||||
|
// collect AM metrics before test
|
||||||
|
collectAssignmentManagerMetrics();
|
||||||
|
|
||||||
|
rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10));
|
||||||
|
waitOnFuture(submitProcedure(createAssignProcedure(hri)));
|
||||||
|
rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15));
|
||||||
waitOnFuture(submitProcedure(createUnassignProcedure(hri)));
|
waitOnFuture(submitProcedure(createUnassignProcedure(hri)));
|
||||||
|
|
||||||
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||||
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
||||||
assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
|
assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
|
||||||
assertEquals(unassignFailedCount + 1, unassignProcMetrics.getFailedCounter().getCount());
|
assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testAssign(final MockRSExecutor executor) throws Exception {
|
private void testAssign(final MockRSExecutor executor) throws Exception {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
|
@ -37,6 +38,7 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
|
@ -356,16 +359,13 @@ public abstract class TestAssignmentManagerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
|
protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
|
||||||
private final int maxSocketTimeoutRetries;
|
private final int timeoutTimes;
|
||||||
private final int maxServerRetries;
|
|
||||||
|
|
||||||
private ServerName lastServer;
|
private ServerName lastServer;
|
||||||
private int sockTimeoutRetries;
|
private int retries;
|
||||||
private int serverRetries;
|
|
||||||
|
|
||||||
public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
|
public SocketTimeoutRsExecutor(int timeoutTimes) {
|
||||||
this.maxServerRetries = maxServerRetries;
|
this.timeoutTimes = timeoutTimes;
|
||||||
this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -373,24 +373,79 @@ public abstract class TestAssignmentManagerBase {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// SocketTimeoutException should be a temporary problem
|
// SocketTimeoutException should be a temporary problem
|
||||||
// unless the server will be declared dead.
|
// unless the server will be declared dead.
|
||||||
if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
|
retries++;
|
||||||
if (sockTimeoutRetries == 1) {
|
if (retries == 1) {
|
||||||
assertNotEquals(lastServer, server);
|
|
||||||
}
|
|
||||||
lastServer = server;
|
lastServer = server;
|
||||||
LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
|
}
|
||||||
throw new SocketTimeoutException("simulate socket timeout");
|
if (retries <= timeoutTimes) {
|
||||||
} else if (serverRetries++ < maxServerRetries) {
|
LOG.debug("Socket timeout for server=" + server + " retries=" + retries);
|
||||||
LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
|
// should not change the server if the server is not dead yet.
|
||||||
|
assertEquals(lastServer, server);
|
||||||
|
if (retries == timeoutTimes) {
|
||||||
|
LOG.info("Mark server=" + server + " as dead. retries=" + retries);
|
||||||
master.getServerManager().moveFromOnlineToDeadServers(server);
|
master.getServerManager().moveFromOnlineToDeadServers(server);
|
||||||
sockTimeoutRetries = 0;
|
}
|
||||||
throw new SocketTimeoutException("simulate socket timeout");
|
throw new SocketTimeoutException("simulate socket timeout");
|
||||||
} else {
|
} else {
|
||||||
|
// should select another server
|
||||||
|
assertNotEquals(lastServer, server);
|
||||||
return super.sendRequest(server, req);
|
return super.sendRequest(server, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
|
||||||
|
|
||||||
|
private boolean invoked = false;
|
||||||
|
|
||||||
|
private ServerName lastServer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||||
|
throws IOException {
|
||||||
|
if (!invoked) {
|
||||||
|
lastServer = server;
|
||||||
|
invoked = true;
|
||||||
|
throw new CallQueueTooBigException("simulate queue full");
|
||||||
|
}
|
||||||
|
// better select another server since the server is over loaded, but anyway, it is fine to
|
||||||
|
// still select the same server since it is not dead yet...
|
||||||
|
if (lastServer.equals(server)) {
|
||||||
|
LOG.warn("We still select the same server, which is not good.");
|
||||||
|
}
|
||||||
|
return super.sendRequest(server, req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
|
||||||
|
|
||||||
|
private final int queueFullTimes;
|
||||||
|
|
||||||
|
private int retries;
|
||||||
|
|
||||||
|
private ServerName lastServer;
|
||||||
|
|
||||||
|
public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
|
||||||
|
this.queueFullTimes = queueFullTimes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||||
|
throws IOException {
|
||||||
|
retries++;
|
||||||
|
if (retries == 1) {
|
||||||
|
lastServer = server;
|
||||||
|
throw new CallTimeoutException("simulate call timeout");
|
||||||
|
}
|
||||||
|
// should always retry on the same server
|
||||||
|
assertEquals(lastServer, server);
|
||||||
|
if (retries < queueFullTimes) {
|
||||||
|
throw new CallQueueTooBigException("simulate queue full");
|
||||||
|
}
|
||||||
|
return super.sendRequest(server, req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
|
* Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
|
||||||
* proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this
|
* proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this
|
||||||
|
|
Loading…
Reference in New Issue