HBASE-19216 Implement a general framework to execute remote procedure on RS

This commit is contained in:
zhangduo 2017-12-15 21:06:44 +08:00
parent 3576eb6bd8
commit 95af14fea6
26 changed files with 1109 additions and 115 deletions

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public enum LockedResourceType {
SERVER, NAMESPACE, TABLE, REGION
SERVER, NAMESPACE, TABLE, REGION, PEER
}

View File

@ -226,13 +226,30 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
/**
* Remote procedure reference.
* @param <TEnv>
* @param <TRemote>
*/
public interface RemoteProcedure<TEnv, TRemote> {
/**
* For building the remote operation.
*/
RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
/**
* Called when the executeProcedure call is failed.
*/
void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
/**
* Called when RS tells the remote procedure is succeeded through the
* {@code reportProcedureDone} method.
*/
void remoteOperationCompleted(TEnv env);
/**
* Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
* method.
* @param error the error message
*/
void remoteOperationFailed(TEnv env, String error);
}
/**

View File

@ -256,14 +256,19 @@ message ClearRegionBlockCacheResponse {
required CacheEvictionStats stats = 1;
}
message RemoteProcedureRequest {
required uint64 proc_id = 1;
required string proc_class = 2;
optional bytes proc_data = 3;
}
message ExecuteProceduresRequest {
repeated OpenRegionRequest open_region = 1;
repeated CloseRegionRequest close_region = 2;
repeated RemoteProcedureRequest proc = 3;
}
message ExecuteProceduresResponse {
repeated OpenRegionResponse open_region = 1;
repeated CloseRegionResponse close_region = 2;
}
service AdminService {

View File

@ -368,3 +368,33 @@ message GCMergedRegionsStateData {
required RegionInfo parent_b = 2;
required RegionInfo merged_child = 3;
}
enum PeerModificationState {
UPDATE_PEER_STORAGE = 1;
REFRESH_PEER_ON_RS = 2;
POST_PEER_MODIFICATION = 3;
}
message PeerModificationStateData {
required string peer_id = 1;
}
enum PeerModificationType {
ADD_PEER = 1;
REMOVE_PEER = 2;
ENABLE_PEER = 3;
DISABLE_PEER = 4;
UPDATE_PEER_CONFIG = 5;
}
message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
}
message RefreshPeerParameter {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
}

View File

@ -143,7 +143,19 @@ message RegionSpaceUseReportRequest {
}
message RegionSpaceUseReportResponse {
}
message ReportProcedureDoneRequest {
required uint64 proc_id = 1;
enum Status {
SUCCESS = 1;
ERROR = 2;
}
required Status status = 2;
optional string error = 3;
}
message ReportProcedureDoneResponse {
}
service RegionServerStatusService {
@ -181,4 +193,7 @@ service RegionServerStatusService {
*/
rpc ReportRegionSpaceUse(RegionSpaceUseReportRequest)
returns(RegionSpaceUseReportResponse);
rpc ReportProcedureDone(ReportProcedureDoneRequest)
returns(ReportProcedureDoneResponse);
}

View File

@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.executor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* List of all HBase event handler types. Event types are named by a
* convention: event type names specify the component from which the event
* originated and then where its destined -- e.g. RS2ZK_ prefix means the
* event came from a regionserver destined for zookeeper -- and then what
* the even is; e.g. REGION_OPENING.
*
* <p>We give the enums indices so we can add types later and keep them
* grouped together rather than have to add them always to the end as we
* would have to if we used raw enum ordinals.
* List of all HBase event handler types.
* <p>
* Event types are named by a convention: event type names specify the component from which the
* event originated and then where its destined -- e.g. RS_ZK_ prefix means the event came from a
* regionserver destined for zookeeper -- and then what the even is; e.g. REGION_OPENING.
* <p>
* We give the enums indices so we can add types later and keep them grouped together rather than
* have to add them always to the end as we would have to if we used raw enum ordinals.
*/
@InterfaceAudience.Private
public enum EventType {
@ -275,7 +274,14 @@ public enum EventType {
*
* RS_COMPACTED_FILES_DISCHARGER
*/
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER),
/**
* RS refresh peer.<br>
*
* RS_REFRESH_PEER
*/
RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
private final int code;
private final ExecutorType executor;

View File

@ -46,7 +46,8 @@ public enum ExecutorType {
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29),
RS_OPEN_PRIORITY_REGION (30);
RS_OPEN_PRIORITY_REGION (30),
RS_REFRESH_PEER (31);
ExecutorType(int value) {
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -140,6 +140,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
@ -329,8 +330,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// flag set after we become the active master (used for testing)
private volatile boolean activeMaster = false;
// flag set after we complete initialization once active,
// it is not private since it's used in unit tests
// flag set after we complete initialization once active
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
// flag set after master services are started,
@ -3588,6 +3588,30 @@ public class HMaster extends HRegionServer implements MasterServices {
return this.spaceQuotaSnapshotNotifier;
}
@SuppressWarnings("unchecked")
private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
Procedure<?> procedure = procedureExecutor.getProcedure(procId);
if (procedure == null) {
return null;
}
assert procedure instanceof RemoteProcedure;
return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
}
public void remoteProcedureCompleted(long procId) {
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
if (procedure != null) {
procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
}
}
public void remoteProcedureFailed(long procId, String error) {
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
if (procedure != null) {
procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
}
}
/**
* This method modifies the master's configuration in order to inject replication-related features
*/

View File

@ -267,6 +267,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@ -2229,4 +2231,15 @@ public class MasterRpcServices extends RSRpcServices
}
return response.build();
}
@Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException {
if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
master.remoteProcedureCompleted(request.getProcId());
} else {
master.remoteProcedureFailed(request.getProcId(), request.getError());
}
return ReportProcedureDoneResponse.getDefaultInstance();
}
}

View File

@ -172,12 +172,6 @@ public abstract class RegionTransitionProcedure
protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
RegionStateNode regionNode, IOException exception);
@Override
public void remoteCallCompleted(final MasterProcedureEnv env,
final ServerName serverName, final RemoteOperation response) {
// Ignore the response? reportTransition() is the one that count?
}
@Override
public void remoteCallFailed(final MasterProcedureEnv env,
final ServerName serverName, final IOException exception) {
@ -414,4 +408,16 @@ public abstract class RegionTransitionProcedure
* @return ServerName the Assign or Unassign is going against.
*/
public abstract ServerName getServer(final MasterProcedureEnv env);
@Override
public void remoteOperationCompleted(MasterProcedureEnv env) {
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}
@Override
public void remoteOperationFailed(MasterProcedureEnv env, String error) {
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
@ -98,12 +99,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((ServerName) k);
private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((String) k);
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private final SchemaLocking locking = new SchemaLocking();
@Override
@ -117,6 +123,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@ -128,7 +136,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure proc, final boolean addFront) {
final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
queue.add(proc, addFront);
if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) {
// if the queue was not remove for an xlock execution
@ -145,7 +153,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected boolean queueHasRunnables() {
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() ||
peerRunQueue.hasRunnables();
}
@Override
@ -153,7 +162,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
Procedure pollResult = doPoll(serverRunQueue);
Procedure<?> pollResult = doPoll(serverRunQueue);
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
@ -290,6 +302,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
} else if (proc instanceof PeerProcedureInterface) {
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) {
removePeerQueue(iProcPeer.getPeerId());
}
} else {
// No cleanup for ServerProcedureInterface types, yet.
return;
@ -327,12 +344,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removeTableLock(tableName);
}
private static boolean isTableProcedure(Procedure proc) {
private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
private static TableName getTableName(Procedure proc) {
private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}
@ -353,14 +369,41 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return Math.abs(hashCode) % buckets.length;
}
private static boolean isServerProcedure(Procedure proc) {
private static boolean isServerProcedure(Procedure<?> proc) {
return proc instanceof ServerProcedureInterface;
}
private static ServerName getServerName(Procedure proc) {
private static ServerName getServerName(Procedure<?> proc) {
return ((ServerProcedureInterface)proc).getServerName();
}
// ============================================================================
// Peer Queue Lookup Helpers
// ============================================================================
private PeerQueue getPeerQueue(String peerId) {
PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new PeerQueue(peerId, locking.getPeerLock(peerId));
peerMap = AvlTree.insert(peerMap, node);
return node;
}
private void removePeerQueue(String peerId) {
peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
locking.removePeerLock(peerId);
}
private static boolean isPeerProcedure(Procedure<?> proc) {
return proc instanceof PeerProcedureInterface;
}
private static String getPeerId(Procedure<?> proc) {
return ((PeerProcedureInterface) proc).getPeerId();
}
// ============================================================================
// Table Locking Helpers
// ============================================================================
@ -716,7 +759,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param serverName Server to lock
* @return true if the procedure has to wait for the server to be available
*/
public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
public boolean waitServerExclusiveLock(final Procedure<?> procedure,
final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@ -738,7 +782,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param serverName the server that has the exclusive lock
*/
public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@ -751,6 +795,55 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
// ============================================================================
// Peer Locking Helpers
// ============================================================================
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
}
/**
* Try to acquire the exclusive lock on the specified peer.
* @see #wakePeerExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @param peerId peer to lock
* @return true if the procedure has to wait for the per to be available
*/
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId));
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.PEER, peerId);
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified peer
* @see #waitPeerExclusiveLock(Procedure, String)
* @param procedure the procedure releasing the lock
* @param peerId the peer that has the exclusive lock
*/
public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
lock.releaseExclusiveLock(procedure);
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* For debugging. Expensive.
*/

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface PeerProcedureInterface {
enum PeerOperationType {
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH
}
String getPeerId();
PeerOperationType getPeerOperationType();
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class PeerQueue extends Queue<String> {
public PeerQueue(String peerId, LockStatus lockStatus) {
super(peerId, lockStatus);
}
@Override
public boolean isAvailable() {
if (isEmpty()) {
return false;
}
if (getLockStatus().hasExclusiveLock()) {
// if we have an exclusive lock already taken
// only child of the lock owner can be executed
Procedure<?> nextProc = peek();
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
}
return true;
}
@Override
public boolean requireExclusiveLock(Procedure<?> proc) {
return requirePeerExclusiveLock((PeerProcedureInterface) proc);
}
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
}
}

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -36,10 +35,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -49,6 +45,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
/**
* A remote procecdure dispatcher for regionservers.
@ -225,7 +228,10 @@ public class RSProcedureDispatcher
private interface RemoteProcedureResolver {
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
}
/**
@ -234,22 +240,28 @@ public class RSProcedureDispatcher
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
* {@link RegionCloseOperation}s.
* @param serverName RegionServer to which the remote operations are sent
* @param remoteProcedures Remote procedures which are dispatched to the given server
* @param operations Remote procedures which are dispatched to the given server
* @param resolver Used to dispatch remote procedures to given server.
*/
public void splitAndResolveOperation(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
RemoteProcedureResolver resolver) {
MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(env, serverName, operations);
final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
if (!openOps.isEmpty()) {
resolver.dispatchOpenRequests(procedureEnv, openOps);
resolver.dispatchOpenRequests(env, openOps);
}
final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
if (!closeOps.isEmpty()) {
resolver.dispatchCloseRequests(procedureEnv, closeOps);
resolver.dispatchCloseRequests(env, closeOps);
}
List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
if (!refreshOps.isEmpty()) {
resolver.dispatchServerOperations(env, refreshOps);
}
if (!reqsByType.isEmpty()) {
@ -281,8 +293,7 @@ public class RSProcedureDispatcher
splitAndResolveOperation(getServerName(), remoteProcedures, this);
try {
final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
remoteCallCompleted(procedureEnv, response);
sendRequest(getServerName(), request.build());
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@ -308,6 +319,11 @@ public class RSProcedureDispatcher
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
}
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
@ -317,17 +333,8 @@ public class RSProcedureDispatcher
}
}
private void remoteCallCompleted(final MasterProcedureEnv env,
final ExecuteProceduresResponse response) {
/*
for (RemoteProcedure proc: operations) {
proc.remoteCallCompleted(env, getServerName(), response);
}*/
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RemoteProcedure proc: remoteProcedures) {
for (RemoteProcedure proc : remoteProcedures) {
proc.remoteCallFailed(env, getServerName(), e);
}
}
@ -368,8 +375,7 @@ public class RSProcedureDispatcher
buildOpenRegionRequest(procedureEnv, getServerName(), operations);
try {
OpenRegionResponse response = sendRequest(getServerName(), request);
remoteCallCompleted(procedureEnv, response);
sendRequest(getServerName(), request);
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@ -390,16 +396,6 @@ public class RSProcedureDispatcher
}
}
private void remoteCallCompleted(final MasterProcedureEnv env,
final OpenRegionResponse response) {
int index = 0;
for (RegionOpenOperation op: operations) {
OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
}
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RegionOpenOperation op: operations) {
op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
@ -449,7 +445,6 @@ public class RSProcedureDispatcher
private void remoteCallCompleted(final MasterProcedureEnv env,
final CloseRegionResponse response) {
operation.setClosed(response.getClosed());
operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@ -490,6 +485,11 @@ public class RSProcedureDispatcher
submitTask(new CloseRegionRemoteCall(serverName, op));
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
throw new UnsupportedOperationException();
}
}
// ==========================================================================
@ -497,13 +497,28 @@ public class RSProcedureDispatcher
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
// - RegionOperation: open, close, flush, snapshot, ...
// ==========================================================================
/* Currently unused
public static abstract class ServerOperation extends RemoteOperation {
protected ServerOperation(final RemoteProcedure remoteProcedure) {
public static final class ServerOperation extends RemoteOperation {
private final long procId;
private final Class<?> rsProcClass;
private final byte[] rsProcData;
public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
byte[] rsProcData) {
super(remoteProcedure);
this.procId = procId;
this.rsProcClass = rsProcClass;
this.rsProcData = rsProcData;
}
public RemoteProcedureRequest buildRequest() {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
}
}
*/
public static abstract class RegionOperation extends RemoteOperation {
private final RegionInfo regionInfo;

View File

@ -143,6 +143,8 @@ class SchemaLocking {
LockedResourceType.TABLE);
addToLockedResources(lockedResources, regionLocks, Function.identity(),
LockedResourceType.REGION);
addToLockedResources(lockedResources, peerLocks, Function.identity(),
LockedResourceType.PEER);
return lockedResources;
}
@ -165,6 +167,9 @@ class SchemaLocking {
case REGION:
queue = regionLocks.get(resourceName);
break;
case PEER:
queue = peerLocks.get(resourceName);
break;
default:
queue = null;
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
@InterfaceAudience.Private
public abstract class ModifyPeerProcedure
extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState>
implements PeerProcedureInterface {
private static final Log LOG = LogFactory.getLog(ModifyPeerProcedure.class);
protected String peerId;
protected ModifyPeerProcedure() {
}
protected ModifyPeerProcedure(String peerId) {
this.peerId = peerId;
}
@Override
public String getPeerId() {
return peerId;
}
/**
* Return {@code false} means that the operation is invalid and we should give up, otherwise
* {@code true}.
* <p>
* You need to call {@link #setFailure(String, Throwable)} to give the detail failure information.
*/
protected abstract boolean updatePeerStorage() throws IOException;
protected void postPeerModification() {
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case UPDATE_PEER_STORAGE:
try {
if (!updatePeerStorage()) {
assert isFailed() : "setFailure is not called";
return Flow.NO_MORE_STATE;
}
} catch (IOException e) {
LOG.warn("update peer storage failed, retry", e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn))
.toArray(RefreshPeerProcedure[]::new));
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
postPeerModification();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
return env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)
? LockState.LOCK_EVENT_WAIT
: LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(MasterProcedureEnv env) {
env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
}
@Override
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected PeerModificationState getState(int stateId) {
return PeerModificationState.forNumber(stateId);
}
@Override
protected int getStateId(PeerModificationState state) {
return state.getNumber();
}
@Override
protected PeerModificationState getInitialState() {
return PeerModificationState.UPDATE_PEER_STORAGE;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/**
* The callable executed at RS side to refresh the peer config/state.
* <p>
* TODO: only a dummy implementation for verifying the framework, will add implementation later.
*/
@InterfaceAudience.Private
public class RefreshPeerCallable implements RSProcedureCallable {
private HRegionServer rs;
private String peerId;
private Exception initError;
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close();
return null;
}
@Override
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId();
} catch (InvalidProtocolBufferException e) {
initError = e;
return;
}
}
@Override
public EventType getEventType() {
return EventType.RS_REFRESH_PEER;
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
@InterfaceAudience.Private
public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
private static final Log LOG = LogFactory.getLog(RefreshPeerProcedure.class);
private String peerId;
private PeerOperationType type;
private ServerName targetServer;
private boolean dispatched;
private ProcedureEvent<?> event;
private boolean succ;
public RefreshPeerProcedure() {
}
public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
this.peerId = peerId;
this.type = type;
this.targetServer = targetServer;
}
@Override
public String getPeerId() {
return peerId;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.REFRESH;
}
private static PeerModificationType toPeerModificationType(PeerOperationType type) {
switch (type) {
case ADD:
return PeerModificationType.ADD_PEER;
case REMOVE:
return PeerModificationType.REMOVE_PEER;
case ENABLE:
return PeerModificationType.ENABLE_PEER;
case DISABLE:
return PeerModificationType.DISABLE_PEER;
case UPDATE_CONFIG:
return PeerModificationType.UPDATE_PEER_CONFIG;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
private static PeerOperationType toPeerOperationType(PeerModificationType type) {
switch (type) {
case ADD_PEER:
return PeerOperationType.ADD;
case REMOVE_PEER:
return PeerOperationType.REMOVE;
case ENABLE_PEER:
return PeerOperationType.ENABLE;
case DISABLE_PEER:
return PeerOperationType.DISABLE;
case UPDATE_PEER_CONFIG:
return PeerOperationType.UPDATE_CONFIG;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
}
private void complete(MasterProcedureEnv env, boolean succ) {
if (event == null) {
LOG.warn("procedure event for " + getProcId() +
" is null, maybe the procedure is created when recovery", new Exception());
return;
}
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer +
(succ ? " suceeded" : " failed"));
this.succ = succ;
event.wake(env.getProcedureScheduler());
event = null;
}
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
IOException exception) {
complete(env, false);
}
@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
complete(env, true);
}
@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) {
complete(env, false);
}
@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (dispatched) {
if (succ) {
return null;
}
// retry
dispatched = false;
}
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type +
" to " + targetServer + ", this usually because the server is already dead," +
" give up and mark the procedure as complete");
return null;
}
dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(MasterProcedureEnv env) {
// TODO: no correctness problem if we just ignore this, implement later.
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class);
peerId = data.getPeerId();
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A general interface for a sub procedure runs at RS side.
*/
@InterfaceAudience.Private
public interface RSProcedureCallable extends Callable<Void> {
/**
* Initialize the callable
* @param parameter the parameter passed from master.
* @param rs the regionserver instance
*/
void init(byte[] parameter, HRegionServer rs);
/**
* Event type used to select thread pool.
*/
EventType getEventType();
}

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
@ -125,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -208,6 +210,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
@ -1891,6 +1894,8 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
}
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
@ -3670,6 +3675,62 @@ public class HRegionServer extends HasThread implements
this.rpcServices, this.rpcServices);
}
public void executeProcedure(long procId, RSProcedureCallable callable) {
executorService.submit(new RSProcedureHandler(this, procId, callable));
}
public void reportProcedureDone(long procId, Throwable error) {
ReportProcedureDoneRequest.Builder builder =
ReportProcedureDoneRequest.newBuilder().setProcId(procId);
if (error != null) {
builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
.setError(Throwables.getStackTraceAsString(error));
} else {
builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
}
ReportProcedureDoneRequest request = builder.build();
int tries = 0;
long pauseTime = INIT_PAUSE_TIME_MS;
while (keepLooping()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
createRegionServerStatusStub();
continue;
}
rss.reportProcedureDone(null, request);
// Log if we had to retry else don't log unless TRACE. We want to
// know if were successful after an attempt showed in logs as failed.
if (tries > 0 || LOG.isTraceEnabled()) {
LOG.info("PROCEDURE REPORTED " + request);
}
return;
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
boolean pause =
ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
if (pause) {
// Do backoff else we flood the Master with requests.
pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
} else {
pauseTime = INIT_PAUSE_TIME_MS; // Reset.
}
LOG.info(
"Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
: " immediately."),
ioe);
if (pause) {
Threads.sleep(pauseTime);
}
tries++;
if (rssStub == rss) {
rssStub = null;
}
}
}
}
/**
* This method modifies the region server's configuration in order to inject replication-related
* features

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -42,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -101,6 +99,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
@ -147,6 +146,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -177,6 +177,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionR
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -3488,23 +3489,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req : request.getOpenRegionList()) {
builder.addOpenRegion(openRegion(controller, req));
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req : request.getCloseRegionList()) {
builder.addCloseRegion(closeRegion(controller, req));
}
}
return builder.build();
}
@Override
public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
ClearRegionBlockCacheRequest request) {
@ -3522,4 +3506,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize());
return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
}
@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req : request.getOpenRegionList()) {
openRegion(controller, req);
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req : request.getCloseRegionList()) {
closeRegion(controller, req);
}
}
if (request.getProcCount() > 0) {
for (RemoteProcedureRequest req : request.getProcList()) {
RSProcedureCallable callable;
try {
callable =
Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class).newInstance();
} catch (Exception e) {
// here we just ignore the error as this should not happen and we do not provide a general
// way to report errors for all types of remote procedure. The procedure will hang at
// master side but after you solve the problem and restart master it will be executed
// again and pass.
LOG.warn("create procedure of type " + req.getProcClass() + " failed, give up", e);
continue;
}
callable.init(req.getProcData().toByteArray(), regionServer);
regionServer.executeProcedure(req.getProcId(), callable);
}
}
return ExecuteProceduresResponse.getDefaultInstance();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A event handler for running procedure.
*/
@InterfaceAudience.Private
public class RSProcedureHandler extends EventHandler {
private final long procId;
private final RSProcedureCallable callable;
public RSProcedureHandler(HRegionServer rs, long procId, RSProcedureCallable callable) {
super(rs, callable.getEventType());
this.procId = procId;
this.callable = callable;
}
@Override
public void process() {
Exception error = null;
try {
callable.call();
} catch (Exception e) {
error = e;
}
((HRegionServer) server).reportProcedureDone(procId, error);
}
}

View File

@ -539,26 +539,16 @@ public class TestAssignmentManager {
@Override
public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest request) throws IOException {
ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req: request.getOpenRegionList()) {
OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder();
for (RegionOpenInfo openReq: req.getOpenInfoList()) {
RegionOpeningState state = execOpenRegion(server, openReq);
if (state != null) {
resp.addOpeningState(state);
}
for (OpenRegionRequest req : request.getOpenRegionList()) {
for (RegionOpenInfo openReq : req.getOpenInfoList()) {
execOpenRegion(server, openReq);
}
builder.addOpenRegion(resp.build());
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req: request.getCloseRegionList()) {
CloseRegionResponse resp = execCloseRegion(server,
req.getRegion().getValue().toByteArray());
if (resp != null) {
builder.addCloseRegion(resp);
}
for (CloseRegionRequest req : request.getCloseRegionList()) {
execCloseRegion(server, req.getRegion().getValue().toByteArray());
}
}
return ExecuteProceduresResponse.newBuilder().build();

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
public DummyModifyPeerProcedure() {
}
public DummyModifyPeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
@Override
protected boolean updatePeerStorage() throws IOException {
return true;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDummyModifyPeerProcedure {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID;
private static Path DIR;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
PEER_ID = "testPeer";
DIR = new Path("/" + PEER_ID);
UTIL.getTestFileSystem().mkdirs(DIR);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
ProcedureExecutor<?> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID));
UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.isFinished(procId);
}
});
Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName().toString())
.collect(Collectors.toCollection(HashSet::new));
for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) {
assertTrue(serverNames.remove(s.getPath().getName()));
}
assertTrue(serverNames.isEmpty());
}
}

View File

@ -30,6 +30,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;