HBASE-21237 Use CompatRemoteProcedureResolver to dispatch open/close region requests to RS

This commit is contained in:
Duo Zhang 2018-10-30 09:41:23 +08:00
parent 422e98957b
commit bddd488c34
2 changed files with 24 additions and 55 deletions

View File

@ -109,17 +109,10 @@ public class RSProcedureDispatcher
protected void remoteDispatch(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
final int rsVersion = master.getServerManager().getVersionNumber(serverName);
if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
LOG.trace("Using procedure batch rpc execution for serverName={} version={}", serverName,
rsVersion);
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
} else if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
} else {
LOG.info(String.format(
"Fallback to compat rpc execution for serverName=%s version=%s",
serverName, rsVersion));
submitTask(new CompatRemoteProcedureResolver(serverName, remoteProcedures));
submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
}
}
@ -293,7 +286,7 @@ public class RSProcedureDispatcher
implements RemoteProcedureResolver {
protected final Set<RemoteProcedure> remoteProcedures;
private ExecuteProceduresRequest.Builder request = null;
protected ExecuteProceduresRequest.Builder request = null;
public ExecuteProceduresRemoteCall(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures) {
@ -325,14 +318,14 @@ public class RSProcedureDispatcher
@Override
public void dispatchOpenRequests(final MasterProcedureEnv env,
final List<RegionOpenOperation> operations) {
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
submitTask(new OpenRegionRemoteCall(getServerName(), operations));
}
@Override
public void dispatchCloseRequests(final MasterProcedureEnv env,
final List<RegionCloseOperation> operations) {
for (RegionCloseOperation op: operations) {
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
submitTask(new CloseRegionRemoteCall(getServerName(), op));
}
}
@ -357,7 +350,7 @@ public class RSProcedureDispatcher
}
}
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
protected static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.setServerStartCode(serverName.getStartcode());
@ -469,46 +462,6 @@ public class RSProcedureDispatcher
}
}
/**
* Compatibility class to open and close regions using old endpoints (openRegion/closeRegion) in
* {@link AdminService}.
*/
protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
private final Set<RemoteProcedure> operations;
private final ServerName serverName;
public CompatRemoteProcedureResolver(final ServerName serverName,
final Set<RemoteProcedure> operations) {
this.serverName = serverName;
this.operations = operations;
}
@Override
public Void call() {
splitAndResolveOperation(serverName, operations, this);
return null;
}
@Override
public void dispatchOpenRequests(final MasterProcedureEnv env,
final List<RegionOpenOperation> operations) {
submitTask(new OpenRegionRemoteCall(serverName, operations));
}
@Override
public void dispatchCloseRequests(final MasterProcedureEnv env,
final List<RegionCloseOperation> operations) {
for (RegionCloseOperation op: operations) {
submitTask(new CloseRegionRemoteCall(serverName, op));
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
throw new UnsupportedOperationException();
}
}
// ==========================================================================
// RPC Messages
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
@ -54,6 +55,8 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@ -829,11 +832,24 @@ public class TestAssignmentManager {
}
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
public MockRemoteCall(final ServerName serverName,
final Set<RemoteProcedure> operations) {
public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
super(serverName, operations);
}
@Override
public void dispatchOpenRequests(MasterProcedureEnv env,
List<RegionOpenOperation> operations) {
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
}
@Override
public void dispatchCloseRequests(MasterProcedureEnv env,
List<RegionCloseOperation> operations) {
for (RegionCloseOperation op : operations) {
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
}
}
@Override
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {