From b73cffb10a192edd6e993f328bef34d8fdca0a80 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 15 Feb 2019 11:04:23 +0800 Subject: [PATCH] HBASE-21875 Change the retry logic in RSProcedureDispatcher to 'retry by default, only if xxx' Signed-off-by: Guanghao Zhang --- .../procedure/RSProcedureDispatcher.java | 345 +++++------------- .../assignment/TestAMServerFailedOpen.java | 40 +- .../assignment/TestAssignmentManager.java | 41 ++- .../assignment/TestAssignmentManagerBase.java | 89 ++++- 4 files changed, 219 insertions(+), 296 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 0b552f30f11..b8ba7b38b35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -22,14 +22,15 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -37,21 +38,18 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; /** @@ -67,8 +65,6 @@ public class RSProcedureDispatcher "hbase.regionserver.rpc.startup.waittime"; private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; - private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0 - protected final MasterServices master; private final long rsStartupWaitTime; private MasterProcedureEnv procedureEnv; @@ -119,18 +115,11 @@ public class RSProcedureDispatcher @Override protected void remoteDispatch(final ServerName serverName, final Set remoteProcedures) { - final int rsVersion = master.getServerManager().getVersionNumber(serverName); - if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { - LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName, - rsVersion); - submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); - } else if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) { + if (!master.getServerManager().isServerOnline(serverName)) { + // fail fast submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); } else { - LOG.info(String.format( - "Fallback to compat rpc execution for serverName=%s version=%s", - serverName, rsVersion)); - submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures)); + submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); } } @@ -154,90 +143,6 @@ public class RSProcedureDispatcher removeNode(serverName); } - /** - * Base remote call - */ - protected abstract class AbstractRSRemoteCall implements Runnable { - - private final ServerName serverName; - - private int numberOfAttemptsSoFar = 0; - private long maxWaitTime = -1; - - public AbstractRSRemoteCall(final ServerName serverName) { - this.serverName = serverName; - } - - protected AdminService.BlockingInterface getRsAdmin() throws IOException { - final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); - if (admin == null) { - throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + - " failed because no RPC connection found to this server"); - } - return admin; - } - - protected ServerName getServerName() { - return serverName; - } - - protected boolean scheduleForRetry(final IOException e) { - // Should we wait a little before retrying? If the server is starting it's yes. - final boolean hold = (e instanceof ServerNotRunningYetException); - if (hold) { - LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); - long now = EnvironmentEdgeManager.currentTime(); - if (now < getMaxWaitTime()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("server is not yet up; waiting up to %dms", - (getMaxWaitTime() - now)), e); - } - submitTask(this, 100, TimeUnit.MILLISECONDS); - return true; - } - - LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); - return false; - } - - // In case it is a connection exception and the region server is still online, - // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to - // open the region on the same server. - final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e) - && master.getServerManager().isServerOnline(serverName)); - if (retry) { - // we want to retry as many times as needed as long as the RS is not dead. - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Retrying to same RegionServer %s because: %s", - serverName, e.getMessage()), e); - } - submitTask(this, 100, TimeUnit.MILLISECONDS); - return true; - } - // trying to send the request elsewhere instead - LOG.warn(String.format("Failed dispatch to server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); - return false; - } - - private long getMaxWaitTime() { - if (this.maxWaitTime < 0) { - // This is the max attempts, not retries, so it should be at least 1. - this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; - } - return this.maxWaitTime; - } - - protected IOException unwrapException(IOException e) { - if (e instanceof RemoteException) { - e = ((RemoteException)e).unwrapRemoteException(); - } - return e; - } - } - private interface RemoteProcedureResolver { void dispatchOpenRequests(MasterProcedureEnv env, List operations); @@ -297,18 +202,106 @@ public class RSProcedureDispatcher // ========================================================================== // Compatibility calls // ========================================================================== - protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall - implements RemoteProcedureResolver { - protected final Set remoteProcedures; + protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable { + + private final ServerName serverName; + + private final Set remoteProcedures; + + private int numberOfAttemptsSoFar = 0; + private long maxWaitTime = -1; private ExecuteProceduresRequest.Builder request = null; public ExecuteProceduresRemoteCall(final ServerName serverName, final Set remoteProcedures) { - super(serverName); + this.serverName = serverName; this.remoteProcedures = remoteProcedures; } + private AdminService.BlockingInterface getRsAdmin() throws IOException { + final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); + if (admin == null) { + throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + + " failed because no RPC connection found to this server"); + } + return admin; + } + + protected final ServerName getServerName() { + return serverName; + } + + private boolean scheduleForRetry(IOException e) { + LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); + // Should we wait a little before retrying? If the server is starting it's yes. + if (e instanceof ServerNotRunningYetException) { + long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime(); + if (remainingTime > 0) { + LOG.warn("waiting a little before trying on the same server={}," + + " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime); + numberOfAttemptsSoFar++; + submitTask(this, 100, TimeUnit.MILLISECONDS); + return true; + } + LOG.warn("server {} is not up for a while; try a new one", serverName); + return false; + } + if (e instanceof DoNotRetryIOException) { + LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName, + e.toString(), numberOfAttemptsSoFar); + return false; + } + // this exception is thrown in the rpc framework, where we can make sure that the call has not + // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd + // better choose another region server + // notice that, it is safe to quit only if this is the first time we send request to region + // server. Maybe the region server has accept our request the first time, and then there is a + // network error which prevents we receive the response, and the second time we hit a + // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a + // double assign... + if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) { + LOG.warn("request to {} failed due to {}, try={}, this usually because" + + " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); + return false; + } + // Always retry for other exception types if the region server is not dead yet. + if (!master.getServerManager().isServerOnline(serverName)) { + LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up", + serverName, e.toString(), numberOfAttemptsSoFar); + return false; + } + if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) { + // A better way is to return true here to let the upper layer quit, and then schedule a + // background task to check whether the region server is dead. And if it is dead, call + // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect + // result, but waste some resources. + LOG.warn("server {} is aborted or stopped, for safety we still need to" + + " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar); + } else { + LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName, + e.toString(), numberOfAttemptsSoFar); + } + numberOfAttemptsSoFar++; + submitTask(this, 100, TimeUnit.MILLISECONDS); + return true; + } + + private long getMaxWaitTime() { + if (this.maxWaitTime < 0) { + // This is the max attempts, not retries, so it should be at least 1. + this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; + } + return this.maxWaitTime; + } + + private IOException unwrapException(IOException e) { + if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } + return e; + } + @Override public void run() { request = ExecuteProceduresRequest.newBuilder(); @@ -348,6 +341,8 @@ public class RSProcedureDispatcher operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); } + // will be overridden in test. + @VisibleForTesting protected ExecuteProceduresResponse sendRequest(final ServerName serverName, final ExecuteProceduresRequest request) throws IOException { try { @@ -357,7 +352,7 @@ public class RSProcedureDispatcher } } - protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { for (RemoteProcedure proc : remoteProcedures) { proc.remoteCallFailed(env, getServerName(), e); } @@ -375,144 +370,6 @@ public class RSProcedureDispatcher return builder.build(); } - // ========================================================================== - // Compatibility calls - // Since we don't have a "batch proc-exec" request on the target RS - // we have to chunk the requests by type and dispatch the specific request. - // ========================================================================== - /** - * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old - * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc. - */ - private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { - private final List operations; - - public OpenRegionRemoteCall(final ServerName serverName, - final List operations) { - super(serverName); - this.operations = operations; - } - - @Override - public void run() { - final OpenRegionRequest request = - buildOpenRegionRequest(procedureEnv, getServerName(), operations); - - try { - sendRequest(getServerName(), request); - } catch (IOException e) { - e = unwrapException(e); - // TODO: In the future some operation may want to bail out early. - // TODO: How many times should we retry (use numberOfAttemptsSoFar) - if (!scheduleForRetry(e)) { - remoteCallFailed(procedureEnv, e); - } - } - } - - private OpenRegionResponse sendRequest(final ServerName serverName, - final OpenRegionRequest request) throws IOException { - try { - return getRsAdmin().openRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { - for (RegionOpenOperation op: operations) { - op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); - } - } - } - - /** - * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old - * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc. - */ - private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { - private final RegionCloseOperation operation; - - public CloseRegionRemoteCall(final ServerName serverName, - final RegionCloseOperation operation) { - super(serverName); - this.operation = operation; - } - - @Override - public void run() { - final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); - try { - CloseRegionResponse response = sendRequest(getServerName(), request); - remoteCallCompleted(procedureEnv, response); - } catch (IOException e) { - e = unwrapException(e); - // TODO: In the future some operation may want to bail out early. - // TODO: How many times should we retry (use numberOfAttemptsSoFar) - if (!scheduleForRetry(e)) { - remoteCallFailed(procedureEnv, e); - } - } - } - - private CloseRegionResponse sendRequest(final ServerName serverName, - final CloseRegionRequest request) throws IOException { - try { - return getRsAdmin().closeRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - private void remoteCallCompleted(final MasterProcedureEnv env, - final CloseRegionResponse response) { - operation.setClosed(response.getClosed()); - } - - private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { - operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); - } - } - - /** - * Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in - * {@link AdminService}. - */ - protected class CompatRemoteProcedureResolver implements Runnable, RemoteProcedureResolver { - private final Set operations; - private final ServerName serverName; - - public CompatRemoteProcedureResolver(final ServerName serverName, - final Set operations) { - this.serverName = serverName; - this.operations = operations; - } - - @Override - public void run() { - splitAndResolveOperation(serverName, operations, this); - } - - @Override - public void dispatchOpenRequests(final MasterProcedureEnv env, - final List operations) { - submitTask(new OpenRegionRemoteCall(serverName, operations)); - } - - @Override - public void dispatchCloseRequests(final MasterProcedureEnv env, - final List operations) { - for (RegionCloseOperation op: operations) { - submitTask(new CloseRegionRemoteCall(serverName, op)); - } - } - - @Override - public void dispatchServerOperations(MasterProcedureEnv env, List operations) { - throw new UnsupportedOperationException(); - } - } - // ========================================================================== // RPC Messages // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java index b4689e543a1..b69218a6e3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMServerFailedOpen.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master.assignment; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import java.io.IOException; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; @@ -75,32 +75,6 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase { // Assign the region (without problems) rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); waitOnFuture(submitProcedure(createAssignProcedure(hri))); - - // TODO: Currently unassign just keeps trying until it sees a server crash. - // There is no count on unassign. - /* - * // Test Unassign operation failure rsDispatcher.setMockRsExecutor(executor); - * waitOnFuture(submitProcedure(createUnassignProcedure(hri))); - * assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount()); - * assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); - * assertEquals(unassignSubmittedCount + 1, - * unassignProcMetrics.getSubmittedCounter().getCount()); // TODO: We supposed to have 1 failed - * assign, 1 successful assign and a failed unassign // operation. But ProcV2 framework marks - * aborted unassign operation as success. Fix it! assertEquals(unassignFailedCount, - * unassignProcMetrics.getFailedCounter().getCount()); - */ - } - - @Test - public void testIOExceptionOnAssignment() throws Exception { - // collect AM metrics before test - collectAssignmentManagerMetrics(); - - testFailedOpen(TableName.valueOf("testExceptionOnAssignment"), - new FaultyRsExecutor(new IOException("test fault"))); - - assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); - assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); } @Test @@ -131,4 +105,16 @@ public class TestAMServerFailedOpen extends TestAssignmentManagerBase { assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen()); } } + + @Test + public void testCallQueueTooBigExceptionOnAssignment() throws Exception { + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + testFailedOpen(TableName.valueOf("testCallQueueTooBigExceptionOnAssignment"), + new FaultyRsExecutor(new CallQueueTooBigException("test do not retry fault"))); + + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 5ec7cc64e4e..4f0e2a98639 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -82,27 +81,53 @@ public class TestAssignmentManager extends TestAssignmentManagerBase { } } - // Disabled for now. Since HBASE-18551, this mock is insufficient. - @Ignore @Test - public void testSocketTimeout() throws Exception { + public void testAssignSocketTimeout() throws Exception { TableName tableName = TableName.valueOf(this.name.getMethodName()); RegionInfo hri = createRegionInfo(tableName, 1); // collect AM metrics before test collectAssignmentManagerMetrics(); - rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3)); + rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20)); waitOnFuture(submitProcedure(createAssignProcedure(hri))); - rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 1)); - // exception.expect(ServerCrashException.class); + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + } + + @Test + public void testAssignQueueFullOnce() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor()); + waitOnFuture(submitProcedure(createAssignProcedure(hri))); + + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + } + + @Test + public void testTimeoutThenQueueFull() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10)); + waitOnFuture(submitProcedure(createAssignProcedure(hri))); + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15)); waitOnFuture(submitProcedure(createUnassignProcedure(hri))); assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount()); - assertEquals(unassignFailedCount + 1, unassignProcMetrics.getFailedCounter().getCount()); + assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount()); } private void testAssign(final MockRSExecutor executor) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index f666ab8e38d..6a88d6ba855 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.assignment; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -37,6 +38,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerMetricsBuilder; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; @@ -356,16 +359,13 @@ public abstract class TestAssignmentManagerBase { } protected class SocketTimeoutRsExecutor extends GoodRsExecutor { - private final int maxSocketTimeoutRetries; - private final int maxServerRetries; + private final int timeoutTimes; private ServerName lastServer; - private int sockTimeoutRetries; - private int serverRetries; + private int retries; - public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { - this.maxServerRetries = maxServerRetries; - this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; + public SocketTimeoutRsExecutor(int timeoutTimes) { + this.timeoutTimes = timeoutTimes; } @Override @@ -373,24 +373,79 @@ public abstract class TestAssignmentManagerBase { throws IOException { // SocketTimeoutException should be a temporary problem // unless the server will be declared dead. - if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { - if (sockTimeoutRetries == 1) { - assertNotEquals(lastServer, server); - } + retries++; + if (retries == 1) { lastServer = server; - LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); - throw new SocketTimeoutException("simulate socket timeout"); - } else if (serverRetries++ < maxServerRetries) { - LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); - master.getServerManager().moveFromOnlineToDeadServers(server); - sockTimeoutRetries = 0; + } + if (retries <= timeoutTimes) { + LOG.debug("Socket timeout for server=" + server + " retries=" + retries); + // should not change the server if the server is not dead yet. + assertEquals(lastServer, server); + if (retries == timeoutTimes) { + LOG.info("Mark server=" + server + " as dead. retries=" + retries); + master.getServerManager().moveFromOnlineToDeadServers(server); + } throw new SocketTimeoutException("simulate socket timeout"); } else { + // should select another server + assertNotEquals(lastServer, server); return super.sendRequest(server, req); } } } + protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { + + private boolean invoked = false; + + private ServerName lastServer; + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + if (!invoked) { + lastServer = server; + invoked = true; + throw new CallQueueTooBigException("simulate queue full"); + } + // better select another server since the server is over loaded, but anyway, it is fine to + // still select the same server since it is not dead yet... + if (lastServer.equals(server)) { + LOG.warn("We still select the same server, which is not good."); + } + return super.sendRequest(server, req); + } + } + + protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { + + private final int queueFullTimes; + + private int retries; + + private ServerName lastServer; + + public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { + this.queueFullTimes = queueFullTimes; + } + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + retries++; + if (retries == 1) { + lastServer = server; + throw new CallTimeoutException("simulate call timeout"); + } + // should always retry on the same server + assertEquals(lastServer, server); + if (retries < queueFullTimes) { + throw new CallQueueTooBigException("simulate queue full"); + } + return super.sendRequest(server, req); + } + } + /** * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this