From 3eff82f8d90e5d5543f48d55e549c9f631ad41a9 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 15 Feb 2019 11:55:52 +0800 Subject: [PATCH] HBASE-21906 Backport the CallQueueTooBigException related changes in HBASE-21875 to branch-2.1/branch-2.0 --- .../procedure/RSProcedureDispatcher.java | 56 +++++++----- .../assignment/TestAssignmentManager.java | 91 ++++++++++++++++++- 2 files changed, 120 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 88a4db8fb03..dc9b99b152e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -22,6 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -175,43 +176,48 @@ public class RSProcedureDispatcher } protected boolean scheduleForRetry(final IOException e) { + LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); // Should we wait a little before retrying? If the server is starting it's yes. - final boolean hold = (e instanceof ServerNotRunningYetException); - if (hold) { - LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); - long now = EnvironmentEdgeManager.currentTime(); - if (now < getMaxWaitTime()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("server is not yet up; waiting up to %dms", - (getMaxWaitTime() - now)), e); - } + if (e instanceof ServerNotRunningYetException) { + long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime(); + if (remainingTime > 0) { + LOG.warn("waiting a little before trying on the same server={}," + + " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime); + numberOfAttemptsSoFar++; submitTask(this, 100, TimeUnit.MILLISECONDS); return true; } - - LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); + LOG.warn("server {} is not up for a while; try a new one", serverName); return false; } - // In case it is a connection exception and the region server is still online, - // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to - // open the region on the same server. - final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e) - && master.getServerManager().isServerOnline(serverName)); - if (retry) { + boolean queueFull = e instanceof CallQueueTooBigException; + // this exception is thrown in the rpc framework, where we can make sure that the call has not + // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd + // better choose another region server + // notice that, it is safe to quit only if this is the first time we send request to region + // server. Maybe the region server has accept our request the first time, and then there is a + // network error which prevents we receive the response, and the second time we hit a + // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a + // double assign.. + if (queueFull && numberOfAttemptsSoFar == 0) { + LOG.warn("request to {} failed due to {}, try={}, this usually because" + + " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); + return false; + } + // In case it is a connection exception and the region server is still online, the openRegion + // RPC could have been accepted by the server and just the response didn't go through. So we + // will retry to open the region on the same server. + if ((queueFull || ClientExceptionsUtil.isConnectionException(e)) && + master.getServerManager().isServerOnline(serverName)) { // we want to retry as many times as needed as long as the RS is not dead. - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Retrying to same RegionServer %s because: %s", - serverName, e.getMessage()), e); - } + LOG.debug("Retrying to same RegionServer {} because: {}", serverName, e.getMessage()); + numberOfAttemptsSoFar++; submitTask(this, 100, TimeUnit.MILLISECONDS); return true; } // trying to send the request elsewhere instead - LOG.warn(String.format("Failed dispatch to server=%s try=%d", - serverName, numberOfAttemptsSoFar), e); + LOG.warn("Failed dispatch to server={} try={}", serverName, numberOfAttemptsSoFar, e); return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 81dac8e2a74..86186e3c791 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -38,6 +38,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; @@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; -import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; @@ -438,6 +438,41 @@ public class TestAssignmentManager { am.wakeMetaLoadedEvent(); } + @Test + public void testAssignQueueFullOnce() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor()); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); + + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + } + + @Test + public void testTimeoutThenQueueFull() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + RegionInfo hri = createRegionInfo(tableName, 1); + + // collect AM metrics before test + collectAssignmentManagerMetrics(); + + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10)); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri))); + rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15)); + waitOnFuture(submitProcedure(am.createUnassignProcedure(hri))); + + assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount()); + assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount()); + assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount()); + } + + private Future submitProcedure(final Procedure proc) { return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); } @@ -810,6 +845,58 @@ public class TestAssignmentManager { } } + protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { + + private boolean invoked = false; + + private ServerName lastServer; + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + if (!invoked) { + lastServer = server; + invoked = true; + throw new CallQueueTooBigException("simulate queue full"); + } + // better select another server since the server is over loaded, but anyway, it is fine to + // still select the same server since it is not dead yet... + if (lastServer.equals(server)) { + LOG.warn("We still select the same server, which is not good."); + } + return super.sendRequest(server, req); + } + } + + protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { + + private final int queueFullTimes; + + private int retries; + + private ServerName lastServer; + + public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { + this.queueFullTimes = queueFullTimes; + } + + @Override + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + retries++; + if (retries == 1) { + lastServer = server; + throw new CallTimeoutException("simulate call timeout"); + } + // should always retry on the same server + assertEquals(lastServer, server); + if (retries < queueFullTimes) { + throw new CallQueueTooBigException("simulate queue full"); + } + return super.sendRequest(server, req); + } + } + private interface MockRSExecutor { ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) throws IOException;