From ebab2fc913e7199c48f965ed5ab4e265ddd96930 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Thu, 28 Feb 2019 19:53:24 +0800 Subject: [PATCH] HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls --- .../procedure2/RemoteProcedureDispatcher.java | 29 ++ .../assignment/RegionRemoteProcedureBase.java | 5 + .../assignment/RegionTransitionProcedure.java | 1 + .../procedure/ServerRemoteProcedure.java | 131 ++++++++ .../procedure/SplitWALRemoteProcedure.java | 84 +----- .../SwitchRpcThrottleRemoteProcedure.java | 61 +--- .../replication/RefreshPeerProcedure.java | 72 +---- .../procedure/TestServerRemoteProcedure.java | 282 ++++++++++++++++++ 8 files changed, 470 insertions(+), 195 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 4a65796a48e..de017ad6a45 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -172,6 +172,16 @@ public abstract class RemoteProcedureDispatcher implements RemoteNode { private Set operations; + private final Set dispatchedOperations = new HashSet<>(); protected BufferNode(final TRemote key) { super(key, 0); @@ -358,6 +379,8 @@ public abstract class RemoteProcedureDispatcher operation.storeInDispatchedQueue()) + .forEach(operation -> dispatchedOperations.add(operation)); this.operations = null; } } @@ -367,6 +390,12 @@ public abstract class RemoteProcedureDispatcher + implements RemoteProcedureDispatcher.RemoteProcedure { + protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); + protected ProcedureEvent event; + protected ServerName targetServer; + protected boolean dispatched; + protected boolean succ; + + protected abstract void complete(MasterProcedureEnv env, Throwable error); + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException frde) { + LOG.warn("Can not send remote operation {} to {}, this operation will " + + "be retried to send to another server", + this.getProcId(), targetServer); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized void completionCleanup(MasterProcedureEnv env) { + env.getRemoteDispatcher().removeCompletedOperation(targetServer, this); + } + + @Override + public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + remoteOperationDone(env, exception); + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + remoteOperationDone(env, null); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + remoteOperationDone(env, error); + } + + synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { + if (this.isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + complete(env, error); + event.wake(env.getProcedureScheduler()); + event = null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index fb2dbd7926c..d227022f267 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -23,16 +23,8 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; -import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; -import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.regionserver.SplitWALCallable; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -48,50 +40,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; * DoNotRetryIOException. Otherwise it will retry until succeed. */ @InterfaceAudience.Private -public class SplitWALRemoteProcedure extends Procedure - implements RemoteProcedureDispatcher.RemoteProcedure, - ServerProcedureInterface { +public class SplitWALRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface { private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); private String walPath; - private ServerName worker; private ServerName crashedServer; - private boolean dispatched; - private ProcedureEvent event; - private boolean success = false; public SplitWALRemoteProcedure() { } public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) { - this.worker = worker; + this.targetServer = worker; this.crashedServer = crashedServer; this.walPath = wal; } - @Override - protected Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (success) { - return null; - } - dispatched = false; - } - try { - env.getRemoteDispatcher().addOperationToNode(worker, this); - } catch (NoNodeDispatchException | NullTargetServerDispatchException - | NoServerDispatchException e) { - // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure. - // Thus return null for this procedure and let SplitWALProcedure to handle this. - LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e); - return null; - } - dispatched = true; - event = new ProcedureEvent<>(this); - event.suspendIfNotReady(this); - throw new ProcedureSuspendedException(); - } - @Override protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { throw new UnsupportedOperationException(); @@ -106,7 +69,7 @@ public class SplitWALRemoteProcedure extends Procedure protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { MasterProcedureProtos.SplitWALRemoteData.Builder builder = MasterProcedureProtos.SplitWALRemoteData.newBuilder(); - builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker)) + builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); serializer.serialize(builder.build()); } @@ -116,7 +79,7 @@ public class SplitWALRemoteProcedure extends Procedure MasterProcedureProtos.SplitWALRemoteData data = serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); walPath = data.getWalPath(); - worker = ProtobufUtil.toServerName(data.getWorker()); + targetServer = ProtobufUtil.toServerName(data.getWorker()); crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); } @@ -129,48 +92,25 @@ public class SplitWALRemoteProcedure extends Procedure } @Override - public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, - IOException exception) { - complete(env, exception); - } - - @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, null); - } - - private void complete(MasterProcedureEnv env, Throwable error) { - if (event == null) { - LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", - getProcId()); - return; - } + protected void complete(MasterProcedureEnv env, Throwable error) { if (error == null) { - LOG.info("split WAL {} on {} succeeded", walPath, worker); + LOG.info("split WAL {} on {} succeeded", walPath, targetServer); try { env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath); - } catch (IOException e){ + } catch (IOException e) { LOG.warn("remove WAL {} failed, ignore...", walPath, e); } - success = true; + succ = true; } else { if (error instanceof DoNotRetryIOException) { LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server", - walPath, worker, error); - success = true; + walPath, targetServer, error); + succ = true; } else { LOG.warn("split WAL {} failed, retry...", walPath, error); - success = false; + succ = false; } - } - event.wake(env.getProcedureScheduler()); - event = null; - } - - @Override - public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { - complete(env, error); } public String getWAL() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 9a56ddc3285..c69faf64151 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -18,17 +18,12 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; + import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable; + import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S * The procedure to switch rpc throttle on region server */ @InterfaceAudience.Private -public class SwitchRpcThrottleRemoteProcedure extends Procedure - implements RemoteProcedure, ServerProcedureInterface { +public class SwitchRpcThrottleRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface { private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class); - private ServerName targetServer; private boolean rpcThrottleEnabled; public SwitchRpcThrottleRemoteProcedure() { @@ -55,32 +49,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure event; - private boolean succ; - - @Override - protected Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (succ) { - return null; - } - dispatched = false; - } - try { - env.getRemoteDispatcher().addOperationToNode(targetServer, this); - } catch (FailedRemoteDispatchException frde) { - LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}", - rpcThrottleEnabled, targetServer); - return null; - } - dispatched = true; - event = new ProcedureEvent<>(this); - event.suspendIfNotReady(this); - throw new ProcedureSuspendedException(); - } - @Override protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { } @@ -117,22 +85,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure +public class RefreshPeerProcedure extends ServerRemoteProcedure implements PeerProcedureInterface, RemoteProcedure { private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class); private String peerId; - private PeerOperationType type; - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", - justification = "Will never change after construction") - private ServerName targetServer; - - private boolean dispatched; - - private ProcedureEvent event; - - private boolean succ; - public RefreshPeerProcedure() { } @@ -122,12 +106,8 @@ public class RefreshPeerProcedure extends Procedure .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); } - private void complete(MasterProcedureEnv env, Throwable error) { - if (event == null) { - LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", - getProcId()); - return; - } + @Override + protected void complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); this.succ = false; @@ -135,50 +115,6 @@ public class RefreshPeerProcedure extends Procedure LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); this.succ = true; } - - event.wake(env.getProcedureScheduler()); - event = null; - } - - @Override - public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote, - IOException exception) { - complete(env, exception); - } - - @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, null); - } - - @Override - public synchronized void remoteOperationFailed(MasterProcedureEnv env, - RemoteProcedureException error) { - complete(env, error); - } - - @Override - protected synchronized Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (succ) { - return null; - } - // retry - dispatched = false; - } - try { - env.getRemoteDispatcher().addOperationToNode(targetServer, this); - } catch (FailedRemoteDispatchException frde) { - LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " + - "this is usually because the server is already dead, " + - "give up and mark the procedure as complete", peerId, type, targetServer, frde); - return null; - } - dispatched = true; - event = new ProcedureEvent<>(this); - event.suspendIfNotReady(this); - throw new ProcedureSuspendedException(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java new file mode 100644 index 00000000000..d4745b98408 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -0,0 +1,282 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE; + +import java.io.IOException; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.MockMasterServices; +import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestServerRemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); + @Rule + public TestName name = new TestName(); + @Rule + public final ExpectedException exception = ExpectedException.none(); + protected HBaseTestingUtility util; + protected MockRSProcedureDispatcher rsDispatcher; + protected MockMasterServices master; + protected AssignmentManager am; + protected NavigableMap> regionsToRegionServers = + new ConcurrentSkipListMap<>(); + // Simple executor to run some simple tasks. + protected ScheduledExecutorService executor; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); + master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); + master.start(2, rsDispatcher); + am = master.getAssignmentManager(); + master.getServerManager().getOnlineServersList().stream() + .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName)); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + @Test + public void testSplitWALAndCrashBeforeResponse() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); + ServerRemoteProcedure splitWALRemoteProcedure = + new SplitWALRemoteProcedure(worker, crashedWorker, "test"); + Future future = submitProcedure(splitWALRemoteProcedure); + Thread.sleep(2000); + master.getServerManager().expireServer(worker); + // if remoteCallFailed is called for this procedure, this procedure should be finished. + future.get(5000, TimeUnit.MILLISECONDS); + Assert.assertTrue(splitWALRemoteProcedure.isSuccess()); + } + + @Test + public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); + Future future = submitProcedure(noopServerRemoteProcedure); + Thread.sleep(2000); + // complete the process and fail the process at the same time + ExecutorService threadPool = Executors.newFixedThreadPool(2); + threadPool.execute(() -> noopServerRemoteProcedure + .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); + threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( + master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); + future.get(2000, TimeUnit.MILLISECONDS); + Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); + } + + @Test + public void testRegionOpenProcedureIsNotHandledByDisPatcher() throws Exception { + TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher"); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1)) + .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build(); + master.getMasterProcedureExecutor().getEnvironment().getAssignmentManager().getRegionStates() + .getOrCreateRegionStateNode(hri); + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(hri, worker); + Future future = submitProcedure(openRegionProcedure); + Thread.sleep(2000); + rsDispatcher.removeNode(worker); + try { + future.get(2000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.info("timeout is expected"); + } + Assert.assertFalse(openRegionProcedure.isFinished()); + } + + private Future submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + private static class NoopServerRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface { + + public NoopServerRemoteProcedure(ServerName targetServer) { + this.targetServer = targetServer; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + return; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + return; + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + return; + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]); + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + complete(env, error); + } + + @Override + public void complete(MasterProcedureEnv env, Throwable error) { + this.succ = true; + return; + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return SWITCH_RPC_THROTTLE; + } + + } + + protected interface MockRSExecutor { + AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException; + } + + protected static class NoopRSExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + if (req.getOpenRegionCount() > 0) { + for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { + for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { + execOpenRegion(server, openReq); + } + } + } + return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); + } + + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, + AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { + return null; + } + } + + protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, + @SuppressWarnings("rawtypes") Set remoteProcedures) { + submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, + @SuppressWarnings("rawtypes") final Set operations) { + super(serverName, operations); + } + + @Override + protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } +}