HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls
This commit is contained in:
parent
3c6ac6d3f8
commit
ebab2fc913
|
@ -172,6 +172,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
|
||||||
|
BufferNode node = nodeMap.get(key);
|
||||||
|
if (node == null) {
|
||||||
|
LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
|
||||||
|
key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
node.operationCompleted(rp);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a remote node
|
* Remove a remote node
|
||||||
* @param key the node identifier
|
* @param key the node identifier
|
||||||
|
@ -237,6 +247,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
* method.
|
* method.
|
||||||
*/
|
*/
|
||||||
void remoteOperationFailed(TEnv env, RemoteProcedureException error);
|
void remoteOperationFailed(TEnv env, RemoteProcedureException error);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether store this remote procedure in dispatched queue
|
||||||
|
* only OpenRegionProcedure and CloseRegionProcedure return false since they are
|
||||||
|
* not fully controlled by dispatcher
|
||||||
|
*/
|
||||||
|
default boolean storeInDispatchedQueue() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -330,6 +350,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
|
protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
|
||||||
implements RemoteNode<TEnv, TRemote> {
|
implements RemoteNode<TEnv, TRemote> {
|
||||||
private Set<RemoteProcedure> operations;
|
private Set<RemoteProcedure> operations;
|
||||||
|
private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
|
||||||
|
|
||||||
protected BufferNode(final TRemote key) {
|
protected BufferNode(final TRemote key) {
|
||||||
super(key, 0);
|
super(key, 0);
|
||||||
|
@ -358,6 +379,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
public synchronized void dispatch() {
|
public synchronized void dispatch() {
|
||||||
if (operations != null) {
|
if (operations != null) {
|
||||||
remoteDispatch(getKey(), operations);
|
remoteDispatch(getKey(), operations);
|
||||||
|
operations.stream().filter(operation -> operation.storeInDispatchedQueue())
|
||||||
|
.forEach(operation -> dispatchedOperations.add(operation));
|
||||||
this.operations = null;
|
this.operations = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,6 +390,12 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
abortPendingOperations(getKey(), operations);
|
abortPendingOperations(getKey(), operations);
|
||||||
this.operations = null;
|
this.operations = null;
|
||||||
}
|
}
|
||||||
|
abortPendingOperations(getKey(), dispatchedOperations);
|
||||||
|
this.dispatchedOperations.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
|
||||||
|
this.dispatchedOperations.remove(remoteProcedure);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -179,6 +179,11 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean storeInDispatchedQueue() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
serializer.serialize(
|
serializer.serialize(
|
||||||
|
|
|
@ -174,4 +174,5 @@ public abstract class RegionTransitionProcedure extends Procedure<MasterProcedur
|
||||||
// should not be called for region operation until we modified the open/close region procedure
|
// should not be called for region operation until we modified the open/close region procedure
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
/**
|
||||||
|
* This extract the common used methods of procedures which are send to remote servers. Developers
|
||||||
|
* who extends this class only need to override remoteCallBuild() and complete(). This procedure
|
||||||
|
* will help add the operation to {@link RSProcedureDispatcher}
|
||||||
|
*
|
||||||
|
* If adding the operation to dispatcher failed, addOperationToNode will throw
|
||||||
|
* FailedRemoteDispatchException, and this procedure will return null which procedure Executor will
|
||||||
|
* mark this procedure as complete. Thus the upper layer of this procedure must have a way to
|
||||||
|
* check if this procedure really succeed and how to deal with it.
|
||||||
|
*
|
||||||
|
* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to
|
||||||
|
* handle this, which actually call remoteOperationDone with the exception.
|
||||||
|
* If the targetServer crashed but this procedure has no response, than dispatcher will call
|
||||||
|
* remoteOperationFailed() to handle this, which also calls remoteOperationDone with the exception.
|
||||||
|
* If the operation is successful, then remoteOperationCompleted will be called and actually calls
|
||||||
|
* the remoteOperationDone without exception.
|
||||||
|
*
|
||||||
|
* In remoteOperationDone, we'll check if the procedure is already get wake up by others. Then
|
||||||
|
* developer could implement complete() based on their own purpose.
|
||||||
|
*
|
||||||
|
* But basic logic is that if operation succeed, set succ to true and do the clean work.
|
||||||
|
*
|
||||||
|
* If operation failed and require to resend it to the same server, leave the succ as false.
|
||||||
|
*
|
||||||
|
* If operation failed and require to resend it to another server, set succ to true and upper layer
|
||||||
|
* should be able to find out this operation not work and send a operation to another server.
|
||||||
|
*/
|
||||||
|
public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
|
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
|
||||||
|
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
|
||||||
|
protected ProcedureEvent<?> event;
|
||||||
|
protected ServerName targetServer;
|
||||||
|
protected boolean dispatched;
|
||||||
|
protected boolean succ;
|
||||||
|
|
||||||
|
protected abstract void complete(MasterProcedureEnv env, Throwable error);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
if (dispatched) {
|
||||||
|
if (succ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||||
|
} catch (FailedRemoteDispatchException frde) {
|
||||||
|
LOG.warn("Can not send remote operation {} to {}, this operation will "
|
||||||
|
+ "be retried to send to another server",
|
||||||
|
this.getProcId(), targetServer);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = true;
|
||||||
|
event = new ProcedureEvent<>(this);
|
||||||
|
event.suspendIfNotReady(this);
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void completionCleanup(MasterProcedureEnv env) {
|
||||||
|
env.getRemoteDispatcher().removeCompletedOperation(targetServer, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
||||||
|
IOException exception) {
|
||||||
|
remoteOperationDone(env, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||||
|
remoteOperationDone(env, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
|
||||||
|
RemoteProcedureException error) {
|
||||||
|
remoteOperationDone(env, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
|
||||||
|
if (this.isFinished()) {
|
||||||
|
LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (event == null) {
|
||||||
|
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
||||||
|
getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
complete(env, error);
|
||||||
|
event.wake(env.getProcedureScheduler());
|
||||||
|
event = null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,16 +23,8 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
|
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -48,50 +40,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
* DoNotRetryIOException. Otherwise it will retry until succeed.
|
* DoNotRetryIOException. Otherwise it will retry until succeed.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
public class SplitWALRemoteProcedure extends ServerRemoteProcedure
|
||||||
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName>,
|
implements ServerProcedureInterface {
|
||||||
ServerProcedureInterface {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
|
||||||
private String walPath;
|
private String walPath;
|
||||||
private ServerName worker;
|
|
||||||
private ServerName crashedServer;
|
private ServerName crashedServer;
|
||||||
private boolean dispatched;
|
|
||||||
private ProcedureEvent<?> event;
|
|
||||||
private boolean success = false;
|
|
||||||
|
|
||||||
public SplitWALRemoteProcedure() {
|
public SplitWALRemoteProcedure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) {
|
public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) {
|
||||||
this.worker = worker;
|
this.targetServer = worker;
|
||||||
this.crashedServer = crashedServer;
|
this.crashedServer = crashedServer;
|
||||||
this.walPath = wal;
|
this.walPath = wal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
|
||||||
if (dispatched) {
|
|
||||||
if (success) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
dispatched = false;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
env.getRemoteDispatcher().addOperationToNode(worker, this);
|
|
||||||
} catch (NoNodeDispatchException | NullTargetServerDispatchException
|
|
||||||
| NoServerDispatchException e) {
|
|
||||||
// When send to a wrong target server, it need construct a new SplitWALRemoteProcedure.
|
|
||||||
// Thus return null for this procedure and let SplitWALProcedure to handle this.
|
|
||||||
LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
dispatched = true;
|
|
||||||
event = new ProcedureEvent<>(this);
|
|
||||||
event.suspendIfNotReady(this);
|
|
||||||
throw new ProcedureSuspendedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
@ -106,7 +69,7 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
|
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
|
||||||
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
|
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
|
||||||
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker))
|
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
|
||||||
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
|
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
|
||||||
serializer.serialize(builder.build());
|
serializer.serialize(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -116,7 +79,7 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
MasterProcedureProtos.SplitWALRemoteData data =
|
MasterProcedureProtos.SplitWALRemoteData data =
|
||||||
serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
|
serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
|
||||||
walPath = data.getWalPath();
|
walPath = data.getWalPath();
|
||||||
worker = ProtobufUtil.toServerName(data.getWorker());
|
targetServer = ProtobufUtil.toServerName(data.getWorker());
|
||||||
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
|
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,48 +92,25 @@ public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
protected void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
IOException exception) {
|
|
||||||
complete(env, exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
|
||||||
complete(env, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
|
||||||
if (event == null) {
|
|
||||||
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
|
||||||
getProcId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (error == null) {
|
if (error == null) {
|
||||||
LOG.info("split WAL {} on {} succeeded", walPath, worker);
|
LOG.info("split WAL {} on {} succeeded", walPath, targetServer);
|
||||||
try {
|
try {
|
||||||
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
|
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
|
||||||
} catch (IOException e){
|
} catch (IOException e) {
|
||||||
LOG.warn("remove WAL {} failed, ignore...", walPath, e);
|
LOG.warn("remove WAL {} failed, ignore...", walPath, e);
|
||||||
}
|
}
|
||||||
success = true;
|
succ = true;
|
||||||
} else {
|
} else {
|
||||||
if (error instanceof DoNotRetryIOException) {
|
if (error instanceof DoNotRetryIOException) {
|
||||||
LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
|
LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
|
||||||
walPath, worker, error);
|
walPath, targetServer, error);
|
||||||
success = true;
|
succ = true;
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("split WAL {} failed, retry...", walPath, error);
|
LOG.warn("split WAL {} failed, retry...", walPath, error);
|
||||||
success = false;
|
succ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
event.wake(env.getProcedureScheduler());
|
|
||||||
event = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
|
||||||
complete(env, error);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getWAL() {
|
public String getWAL() {
|
||||||
|
|
|
@ -18,17 +18,12 @@
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
|
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -40,11 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S
|
||||||
* The procedure to switch rpc throttle on region server
|
* The procedure to switch rpc throttle on region server
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureEnv>
|
public class SwitchRpcThrottleRemoteProcedure extends ServerRemoteProcedure
|
||||||
implements RemoteProcedure<MasterProcedureEnv, ServerName>, ServerProcedureInterface {
|
implements ServerProcedureInterface {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class);
|
||||||
private ServerName targetServer;
|
|
||||||
private boolean rpcThrottleEnabled;
|
private boolean rpcThrottleEnabled;
|
||||||
|
|
||||||
public SwitchRpcThrottleRemoteProcedure() {
|
public SwitchRpcThrottleRemoteProcedure() {
|
||||||
|
@ -55,32 +49,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
|
||||||
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
this.rpcThrottleEnabled = rpcThrottleEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean dispatched;
|
|
||||||
private ProcedureEvent<?> event;
|
|
||||||
private boolean succ;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
|
||||||
if (dispatched) {
|
|
||||||
if (succ) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
dispatched = false;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
|
||||||
} catch (FailedRemoteDispatchException frde) {
|
|
||||||
LOG.warn("Can not add remote operation for switching rpc throttle to {} on {}",
|
|
||||||
rpcThrottleEnabled, targetServer);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
dispatched = true;
|
|
||||||
event = new ProcedureEvent<>(this);
|
|
||||||
event.suspendIfNotReady(this);
|
|
||||||
throw new ProcedureSuspendedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
}
|
}
|
||||||
|
@ -117,22 +85,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
|
||||||
.toByteArray());
|
.toByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
|
||||||
IOException exception) {
|
|
||||||
complete(env, exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
|
||||||
complete(env, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
|
||||||
complete(env, error);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
return targetServer;
|
return targetServer;
|
||||||
|
@ -148,7 +100,8 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
|
||||||
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
return ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
@Override
|
||||||
|
protected void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
|
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
|
||||||
error);
|
error);
|
||||||
|
@ -156,8 +109,6 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure<MasterProcedureE
|
||||||
} else {
|
} else {
|
||||||
this.succ = true;
|
this.succ = true;
|
||||||
}
|
}
|
||||||
event.wake(env.getProcedureScheduler());
|
|
||||||
event = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,15 +22,10 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
|
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
|
||||||
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
|
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -42,25 +37,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
public class RefreshPeerProcedure extends ServerRemoteProcedure
|
||||||
implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
|
implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class);
|
||||||
|
|
||||||
private String peerId;
|
private String peerId;
|
||||||
|
|
||||||
private PeerOperationType type;
|
private PeerOperationType type;
|
||||||
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
|
|
||||||
justification = "Will never change after construction")
|
|
||||||
private ServerName targetServer;
|
|
||||||
|
|
||||||
private boolean dispatched;
|
|
||||||
|
|
||||||
private ProcedureEvent<?> event;
|
|
||||||
|
|
||||||
private boolean succ;
|
|
||||||
|
|
||||||
public RefreshPeerProcedure() {
|
public RefreshPeerProcedure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,12 +106,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
||||||
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
|
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
@Override
|
||||||
if (event == null) {
|
protected void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
|
||||||
getProcId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
|
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
|
||||||
this.succ = false;
|
this.succ = false;
|
||||||
|
@ -135,50 +115,6 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
||||||
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
|
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
|
||||||
this.succ = true;
|
this.succ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
event.wake(env.getProcedureScheduler());
|
|
||||||
event = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
|
|
||||||
IOException exception) {
|
|
||||||
complete(env, exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
|
|
||||||
complete(env, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
|
|
||||||
RemoteProcedureException error) {
|
|
||||||
complete(env, error);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
|
||||||
if (dispatched) {
|
|
||||||
if (succ) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
// retry
|
|
||||||
dispatched = false;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
|
||||||
} catch (FailedRemoteDispatchException frde) {
|
|
||||||
LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " +
|
|
||||||
"this is usually because the server is already dead, " +
|
|
||||||
"give up and mark the procedure as complete", peerId, type, targetServer, frde);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
dispatched = true;
|
|
||||||
event = new ProcedureEvent<>(this);
|
|
||||||
event.suspendIfNotReady(this);
|
|
||||||
throw new ProcedureSuspendedException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,282 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestServerRemoteProcedure {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
@Rule
|
||||||
|
public final ExpectedException exception = ExpectedException.none();
|
||||||
|
protected HBaseTestingUtility util;
|
||||||
|
protected MockRSProcedureDispatcher rsDispatcher;
|
||||||
|
protected MockMasterServices master;
|
||||||
|
protected AssignmentManager am;
|
||||||
|
protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
|
||||||
|
new ConcurrentSkipListMap<>();
|
||||||
|
// Simple executor to run some simple tasks.
|
||||||
|
protected ScheduledExecutorService executor;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
util = new HBaseTestingUtility();
|
||||||
|
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
|
||||||
|
.setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
|
||||||
|
master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
|
||||||
|
rsDispatcher = new MockRSProcedureDispatcher(master);
|
||||||
|
rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
|
||||||
|
master.start(2, rsDispatcher);
|
||||||
|
am = master.getAssignmentManager();
|
||||||
|
master.getServerManager().getOnlineServersList().stream()
|
||||||
|
.forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
master.stop("tearDown");
|
||||||
|
this.executor.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitWALAndCrashBeforeResponse() throws Exception {
|
||||||
|
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
|
||||||
|
ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
|
||||||
|
ServerRemoteProcedure splitWALRemoteProcedure =
|
||||||
|
new SplitWALRemoteProcedure(worker, crashedWorker, "test");
|
||||||
|
Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
master.getServerManager().expireServer(worker);
|
||||||
|
// if remoteCallFailed is called for this procedure, this procedure should be finished.
|
||||||
|
future.get(5000, TimeUnit.MILLISECONDS);
|
||||||
|
Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
|
||||||
|
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
|
||||||
|
ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
|
||||||
|
Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// complete the process and fail the process at the same time
|
||||||
|
ExecutorService threadPool = Executors.newFixedThreadPool(2);
|
||||||
|
threadPool.execute(() -> noopServerRemoteProcedure
|
||||||
|
.remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
|
||||||
|
threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
|
||||||
|
master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
|
||||||
|
future.get(2000, TimeUnit.MILLISECONDS);
|
||||||
|
Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegionOpenProcedureIsNotHandledByDisPatcher() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
|
||||||
|
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
|
||||||
|
.setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
|
||||||
|
master.getMasterProcedureExecutor().getEnvironment().getAssignmentManager().getRegionStates()
|
||||||
|
.getOrCreateRegionStateNode(hri);
|
||||||
|
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
|
||||||
|
OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(hri, worker);
|
||||||
|
Future<byte[]> future = submitProcedure(openRegionProcedure);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
rsDispatcher.removeNode(worker);
|
||||||
|
try {
|
||||||
|
future.get(2000, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
LOG.info("timeout is expected");
|
||||||
|
}
|
||||||
|
Assert.assertFalse(openRegionProcedure.isFinished());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
|
||||||
|
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
|
||||||
|
implements ServerProcedureInterface {
|
||||||
|
|
||||||
|
public NoopServerRemoteProcedure(ServerName targetServer) {
|
||||||
|
this.targetServer = targetServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(MasterProcedureEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
|
||||||
|
ServerName serverName) {
|
||||||
|
return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||||
|
complete(env, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
|
||||||
|
RemoteProcedureException error) {
|
||||||
|
complete(env, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
|
this.succ = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return targetServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return SWITCH_RPC_THROTTLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected interface MockRSExecutor {
|
||||||
|
AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||||
|
AdminProtos.ExecuteProceduresRequest req) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class NoopRSExecutor implements MockRSExecutor {
|
||||||
|
@Override
|
||||||
|
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||||
|
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||||
|
if (req.getOpenRegionCount() > 0) {
|
||||||
|
for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
|
||||||
|
for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
|
||||||
|
execOpenRegion(server, openReq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
|
||||||
|
AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
|
||||||
|
private MockRSExecutor mockRsExec;
|
||||||
|
|
||||||
|
public MockRSProcedureDispatcher(final MasterServices master) {
|
||||||
|
super(master);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
|
||||||
|
this.mockRsExec = mockRsExec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void remoteDispatch(ServerName serverName,
|
||||||
|
@SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
|
||||||
|
submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
|
||||||
|
public MockRemoteCall(final ServerName serverName,
|
||||||
|
@SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
|
||||||
|
super(serverName, operations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
|
||||||
|
final AdminProtos.ExecuteProceduresRequest request) throws IOException {
|
||||||
|
return mockRsExec.sendRequest(serverName, request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue