HBASE-21906 Backport the CallQueueTooBigException related changes in HBASE-21875 to branch-2.1/branch-2.0
This commit is contained in:
parent
7206ef6427
commit
3eff82f8d9
|
@ -22,6 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -175,43 +176,48 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
|
||||
protected boolean scheduleForRetry(final IOException e) {
|
||||
LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
|
||||
// Should we wait a little before retrying? If the server is starting it's yes.
|
||||
final boolean hold = (e instanceof ServerNotRunningYetException);
|
||||
if (hold) {
|
||||
LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
|
||||
serverName, numberOfAttemptsSoFar), e);
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now < getMaxWaitTime()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("server is not yet up; waiting up to %dms",
|
||||
(getMaxWaitTime() - now)), e);
|
||||
}
|
||||
if (e instanceof ServerNotRunningYetException) {
|
||||
long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
|
||||
if (remainingTime > 0) {
|
||||
LOG.warn("waiting a little before trying on the same server={}," +
|
||||
" try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
|
||||
numberOfAttemptsSoFar++;
|
||||
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
|
||||
LOG.warn("server {} is not up for a while; try a new one", serverName);
|
||||
return false;
|
||||
}
|
||||
|
||||
// In case it is a connection exception and the region server is still online,
|
||||
// the openRegion RPC could have been accepted by the server and
|
||||
// just the response didn't go through. So we will retry to
|
||||
// open the region on the same server.
|
||||
final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
|
||||
&& master.getServerManager().isServerOnline(serverName));
|
||||
if (retry) {
|
||||
boolean queueFull = e instanceof CallQueueTooBigException;
|
||||
// this exception is thrown in the rpc framework, where we can make sure that the call has not
|
||||
// been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
|
||||
// better choose another region server
|
||||
// notice that, it is safe to quit only if this is the first time we send request to region
|
||||
// server. Maybe the region server has accept our request the first time, and then there is a
|
||||
// network error which prevents we receive the response, and the second time we hit a
|
||||
// CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
|
||||
// double assign..
|
||||
if (queueFull && numberOfAttemptsSoFar == 0) {
|
||||
LOG.warn("request to {} failed due to {}, try={}, this usually because" +
|
||||
" server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
|
||||
return false;
|
||||
}
|
||||
// In case it is a connection exception and the region server is still online, the openRegion
|
||||
// RPC could have been accepted by the server and just the response didn't go through. So we
|
||||
// will retry to open the region on the same server.
|
||||
if ((queueFull || ClientExceptionsUtil.isConnectionException(e)) &&
|
||||
master.getServerManager().isServerOnline(serverName)) {
|
||||
// we want to retry as many times as needed as long as the RS is not dead.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
|
||||
serverName, e.getMessage()), e);
|
||||
}
|
||||
LOG.debug("Retrying to same RegionServer {} because: {}", serverName, e.getMessage());
|
||||
numberOfAttemptsSoFar++;
|
||||
submitTask(this, 100, TimeUnit.MILLISECONDS);
|
||||
return true;
|
||||
}
|
||||
// trying to send the request elsewhere instead
|
||||
LOG.warn(String.format("Failed dispatch to server=%s try=%d",
|
||||
serverName, numberOfAttemptsSoFar), e);
|
||||
LOG.warn("Failed dispatch to server={} try={}", serverName, numberOfAttemptsSoFar, e);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
|
@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
|||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
|
@ -438,6 +438,41 @@ public class TestAssignmentManager {
|
|||
am.wakeMetaLoadedEvent();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignQueueFullOnce() throws Exception {
|
||||
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||
|
||||
// collect AM metrics before test
|
||||
collectAssignmentManagerMetrics();
|
||||
|
||||
rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor());
|
||||
waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
|
||||
|
||||
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeoutThenQueueFull() throws Exception {
|
||||
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||
|
||||
// collect AM metrics before test
|
||||
collectAssignmentManagerMetrics();
|
||||
|
||||
rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10));
|
||||
waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
|
||||
rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15));
|
||||
waitOnFuture(submitProcedure(am.createUnassignProcedure(hri)));
|
||||
|
||||
assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
|
||||
assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
|
||||
assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
|
||||
assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
|
||||
}
|
||||
|
||||
|
||||
private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
|
||||
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
@ -810,6 +845,58 @@ public class TestAssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private boolean invoked = false;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
if (!invoked) {
|
||||
lastServer = server;
|
||||
invoked = true;
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
// better select another server since the server is over loaded, but anyway, it is fine to
|
||||
// still select the same server since it is not dead yet...
|
||||
if (lastServer.equals(server)) {
|
||||
LOG.warn("We still select the same server, which is not good.");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private final int queueFullTimes;
|
||||
|
||||
private int retries;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
|
||||
this.queueFullTimes = queueFullTimes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
retries++;
|
||||
if (retries == 1) {
|
||||
lastServer = server;
|
||||
throw new CallTimeoutException("simulate call timeout");
|
||||
}
|
||||
// should always retry on the same server
|
||||
assertEquals(lastServer, server);
|
||||
if (retries < queueFullTimes) {
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
private interface MockRSExecutor {
|
||||
ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException;
|
||||
|
|
Loading…
Reference in New Issue