HBASE-19216 Implement a general framework to execute remote procedure on RS

This commit is contained in:
zhangduo 2017-12-15 21:06:44 +08:00
parent ee4f0c506a
commit f17198ff19
24 changed files with 1122 additions and 184 deletions

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public enum LockedResourceType {
SERVER, NAMESPACE, TABLE, REGION
SERVER, NAMESPACE, TABLE, REGION, PEER
}

View File

@ -226,13 +226,30 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
/**
* Remote procedure reference.
* @param <TEnv>
* @param <TRemote>
*/
public interface RemoteProcedure<TEnv, TRemote> {
/**
* For building the remote operation.
*/
RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
/**
* Called when the executeProcedure call is failed.
*/
void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
/**
* Called when RS tells the remote procedure is succeeded through the
* {@code reportProcedureDone} method.
*/
void remoteOperationCompleted(TEnv env);
/**
* Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
* method.
* @param error the error message
*/
void remoteOperationFailed(TEnv env, String error);
}
/**

View File

@ -256,14 +256,19 @@ message ClearRegionBlockCacheResponse {
required CacheEvictionStats stats = 1;
}
message RemoteProcedureRequest {
required uint64 proc_id = 1;
required string proc_class = 2;
optional bytes proc_data = 3;
}
message ExecuteProceduresRequest {
repeated OpenRegionRequest open_region = 1;
repeated CloseRegionRequest close_region = 2;
repeated RemoteProcedureRequest proc = 3;
}
message ExecuteProceduresResponse {
repeated OpenRegionResponse open_region = 1;
repeated CloseRegionResponse close_region = 2;
}
service AdminService {

View File

@ -365,3 +365,33 @@ message GCMergedRegionsStateData {
required RegionInfo parent_b = 2;
required RegionInfo merged_child = 3;
}
enum PeerModificationState {
UPDATE_PEER_STORAGE = 1;
REFRESH_PEER_ON_RS = 2;
POST_PEER_MODIFICATION = 3;
}
message PeerModificationStateData {
required string peer_id = 1;
}
enum PeerModificationType {
ADD_PEER = 1;
REMOVE_PEER = 2;
ENABLE_PEER = 3;
DISABLE_PEER = 4;
UPDATE_PEER_CONFIG = 5;
}
message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
}
message RefreshPeerParameter {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
}

View File

@ -143,7 +143,19 @@ message RegionSpaceUseReportRequest {
}
message RegionSpaceUseReportResponse {
}
message ReportProcedureDoneRequest {
required uint64 proc_id = 1;
enum Status {
SUCCESS = 1;
ERROR = 2;
}
required Status status = 2;
optional string error = 3;
}
message ReportProcedureDoneResponse {
}
service RegionServerStatusService {
@ -181,4 +193,7 @@ service RegionServerStatusService {
*/
rpc ReportRegionSpaceUse(RegionSpaceUseReportRequest)
returns(RegionSpaceUseReportResponse);
rpc ReportProcedureDone(ReportProcedureDoneRequest)
returns(ReportProcedureDoneResponse);
}

View File

@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.executor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* List of all HBase event handler types. Event types are named by a
* convention: event type names specify the component from which the event
* originated and then where its destined -- e.g. RS2ZK_ prefix means the
* event came from a regionserver destined for zookeeper -- and then what
* the even is; e.g. REGION_OPENING.
*
* <p>We give the enums indices so we can add types later and keep them
* grouped together rather than have to add them always to the end as we
* would have to if we used raw enum ordinals.
* List of all HBase event handler types.
* <p>
* Event types are named by a convention: event type names specify the component from which the
* event originated and then where its destined -- e.g. RS_ZK_ prefix means the event came from a
* regionserver destined for zookeeper -- and then what the even is; e.g. REGION_OPENING.
* <p>
* We give the enums indices so we can add types later and keep them grouped together rather than
* have to add them always to the end as we would have to if we used raw enum ordinals.
*/
@InterfaceAudience.Private
public enum EventType {
@ -275,7 +274,14 @@ public enum EventType {
*
* RS_COMPACTED_FILES_DISCHARGER
*/
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER),
/**
* RS refresh peer.<br>
*
* RS_REFRESH_PEER
*/
RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
private final int code;
private final ExecutorType executor;

View File

@ -46,7 +46,8 @@ public enum ExecutorType {
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29),
RS_OPEN_PRIORITY_REGION (30);
RS_OPEN_PRIORITY_REGION (30),
RS_REFRESH_PEER (31);
ExecutorType(int value) {
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -140,6 +139,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
@ -328,8 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// flag set after we become the active master (used for testing)
private volatile boolean activeMaster = false;
// flag set after we complete initialization once active,
// it is not private since it's used in unit tests
// flag set after we complete initialization once active
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
// flag set after master services are started,
@ -3525,4 +3524,28 @@ public class HMaster extends HRegionServer implements MasterServices {
public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
return this.spaceQuotaSnapshotNotifier;
}
}
@SuppressWarnings("unchecked")
private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
Procedure<?> procedure = procedureExecutor.getProcedure(procId);
if (procedure == null) {
return null;
}
assert procedure instanceof RemoteProcedure;
return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
}
public void remoteProcedureCompleted(long procId) {
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
if (procedure != null) {
procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
}
}
public void remoteProcedureFailed(long procId, String error) {
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
if (procedure != null) {
procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
}
}
}

View File

@ -264,6 +264,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@ -2247,4 +2249,15 @@ public class MasterRpcServices extends RSRpcServices
}
return response.build();
}
@Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException {
if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
master.remoteProcedureCompleted(request.getProcId());
} else {
master.remoteProcedureFailed(request.getProcId(), request.getError());
}
return ReportProcedureDoneResponse.getDefaultInstance();
}
}

View File

@ -172,12 +172,6 @@ public abstract class RegionTransitionProcedure
protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
RegionStateNode regionNode, IOException exception);
@Override
public void remoteCallCompleted(final MasterProcedureEnv env,
final ServerName serverName, final RemoteOperation response) {
// Ignore the response? reportTransition() is the one that count?
}
@Override
public void remoteCallFailed(final MasterProcedureEnv env,
final ServerName serverName, final IOException exception) {
@ -413,4 +407,16 @@ public abstract class RegionTransitionProcedure
* @return ServerName the Assign or Unassign is going against.
*/
public abstract ServerName getServer(final MasterProcedureEnv env);
@Override
public void remoteOperationCompleted(MasterProcedureEnv env) {
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}
@Override
public void remoteOperationFailed(MasterProcedureEnv env, String error) {
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}
}

View File

@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
@ -109,12 +110,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
new ServerQueueKeyComparator();
private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
new TableQueueKeyComparator();
private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR =
new PeerQueueKeyComparator();
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private final SchemaLocking locking = new SchemaLocking();
/**
@ -161,6 +167,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@ -172,7 +180,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure proc, final boolean addFront) {
final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
queue.add(proc, addFront);
if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) {
// if the queue was not remove for an xlock execution
@ -189,7 +197,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected boolean queueHasRunnables() {
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() ||
peerRunQueue.hasRunnables();
}
@Override
@ -197,7 +206,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
Procedure pollResult = doPoll(serverRunQueue);
Procedure<?> pollResult = doPoll(serverRunQueue);
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
@ -267,60 +279,30 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures);
}
private <T> void addToLockedResources(List<LockedResource> lockedResources,
Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
LockedResourceType resourcesType) {
locks.entrySet().stream().filter(e -> e.getValue().isLocked())
.map(
e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
.forEachOrdered(lockedResources::add);
}
@Override
public List<LockedResource> getLocks() {
schedLock();
try {
List<LockedResource> lockedResources = new ArrayList<>();
for (Entry<ServerName, LockAndQueue> entry : locking.serverLocks
.entrySet()) {
String serverName = entry.getKey().getServerName();
LockAndQueue queue = entry.getValue();
if (queue.isLocked()) {
LockedResource lockedResource =
createLockedResource(LockedResourceType.SERVER, serverName, queue);
lockedResources.add(lockedResource);
}
}
for (Entry<String, LockAndQueue> entry : locking.namespaceLocks
.entrySet()) {
String namespaceName = entry.getKey();
LockAndQueue queue = entry.getValue();
if (queue.isLocked()) {
LockedResource lockedResource =
createLockedResource(LockedResourceType.NAMESPACE, namespaceName, queue);
lockedResources.add(lockedResource);
}
}
for (Entry<TableName, LockAndQueue> entry : locking.tableLocks
.entrySet()) {
String tableName = entry.getKey().getNameAsString();
LockAndQueue queue = entry.getValue();
if (queue.isLocked()) {
LockedResource lockedResource =
createLockedResource(LockedResourceType.TABLE, tableName, queue);
lockedResources.add(lockedResource);
}
}
for (Entry<String, LockAndQueue> entry : locking.regionLocks.entrySet()) {
String regionName = entry.getKey();
LockAndQueue queue = entry.getValue();
if (queue.isLocked()) {
LockedResource lockedResource =
createLockedResource(LockedResourceType.REGION, regionName, queue);
lockedResources.add(lockedResource);
}
}
addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(),
LockedResourceType.SERVER);
addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(),
LockedResourceType.NAMESPACE);
addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(),
LockedResourceType.TABLE);
addToLockedResources(lockedResources, locking.regionLocks, Function.identity(),
LockedResourceType.REGION);
addToLockedResources(lockedResources, locking.peerLocks, Function.identity(),
LockedResourceType.PEER);
return lockedResources;
} finally {
schedUnlock();
@ -328,8 +310,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
public LockedResource getLockResource(LockedResourceType resourceType,
String resourceName) {
public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
LockAndQueue queue = null;
schedLock();
try {
@ -346,8 +327,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
case REGION:
queue = locking.regionLocks.get(resourceName);
break;
case PEER:
queue = locking.peerLocks.get(resourceName);
break;
}
return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
} finally {
schedUnlock();
@ -431,6 +414,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
} else if (proc instanceof PeerProcedureInterface) {
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) {
removePeerQueue(iProcPeer.getPeerId());
}
} else {
// No cleanup for ServerProcedureInterface types, yet.
return;
@ -468,12 +456,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removeTableLock(tableName);
}
private static boolean isTableProcedure(Procedure proc) {
private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
private static TableName getTableName(Procedure proc) {
private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}
@ -494,14 +481,41 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return Math.abs(hashCode) % buckets.length;
}
private static boolean isServerProcedure(Procedure proc) {
private static boolean isServerProcedure(Procedure<?> proc) {
return proc instanceof ServerProcedureInterface;
}
private static ServerName getServerName(Procedure proc) {
private static ServerName getServerName(Procedure<?> proc) {
return ((ServerProcedureInterface)proc).getServerName();
}
// ============================================================================
// Peer Queue Lookup Helpers
// ============================================================================
private PeerQueue getPeerQueue(String peerId) {
PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new PeerQueue(peerId, locking.getPeerLock(peerId));
peerMap = AvlTree.insert(peerMap, node);
return node;
}
private void removePeerQueue(String peerId) {
peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
locking.removePeerLock(peerId);
}
private static boolean isPeerProcedure(Procedure<?> proc) {
return proc instanceof PeerProcedureInterface;
}
private static String getPeerId(Procedure<?> proc) {
return ((PeerProcedureInterface) proc).getPeerId();
}
// ============================================================================
// Table and Server Queue Implementation
// ============================================================================
@ -571,6 +585,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
private static class PeerQueueKeyComparator implements AvlKeyComparator<PeerQueue> {
@Override
public int compareKey(PeerQueue node, Object key) {
return node.compareKey((String) key);
}
}
public static class PeerQueue extends Queue<String> {
public PeerQueue(String peerId, LockStatus lockStatus) {
super(peerId, lockStatus);
}
@Override
public boolean requireExclusiveLock(Procedure proc) {
return requirePeerExclusiveLock((PeerProcedureInterface) proc);
}
}
// ============================================================================
// Table Locking Helpers
// ============================================================================
@ -958,7 +992,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param serverName Server to lock
* @return true if the procedure has to wait for the server to be available
*/
public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
public boolean waitServerExclusiveLock(final Procedure<?> procedure,
final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@ -980,7 +1015,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param serverName the server that has the exclusive lock
*/
public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@ -993,6 +1028,56 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
// ============================================================================
// Peer Locking Helpers
// ============================================================================
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
}
/**
* Try to acquire the exclusive lock on the specified peer.
* @see #wakePeerExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @param peerId peer to lock
* @return true if the procedure has to wait for the per to be available
*/
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId));
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.PEER, peerId);
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified peer
* @see #waitPeerExclusiveLock(Procedure, String)
* @param procedure the procedure releasing the lock
* @param peerId the peer that has the exclusive lock
*/
public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
lock.releaseExclusiveLock(procedure);
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
// ============================================================================
// Generic Helpers
// ============================================================================
@ -1098,6 +1183,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
// Single map for all regions irrespective of tables. Key is encoded region name.
final Map<String, LockAndQueue> regionLocks = new HashMap<>();
final Map<String, LockAndQueue> peerLocks = new HashMap<>();
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
LockAndQueue lock = map.get(key);
@ -1132,6 +1218,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return getLock(serverLocks, serverName);
}
LockAndQueue getPeerLock(String peerId) {
return getLock(peerLocks, peerId);
}
LockAndQueue removePeerLock(String peerId) {
return peerLocks.remove(peerId);
}
/**
* Removes all locks by clearing the maps.
* Used when procedure executor is stopped for failure and recovery testing.
@ -1142,6 +1236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
namespaceLocks.clear();
tableLocks.clear();
regionLocks.clear();
peerLocks.clear();
}
@Override
@ -1149,7 +1244,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return "serverLocks=" + filterUnlocked(this.serverLocks) +
", namespaceLocks=" + filterUnlocked(this.namespaceLocks) +
", tableLocks=" + filterUnlocked(this.tableLocks) +
", regionLocks=" + filterUnlocked(this.regionLocks);
", regionLocks=" + filterUnlocked(this.regionLocks) +
", peerLocks=" + filterUnlocked(this.peerLocks);
}
private String filterUnlocked(Map<?, LockAndQueue> locks) {

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface PeerProcedureInterface {
enum PeerOperationType {
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH
}
String getPeerId();
PeerOperationType getPeerOperationType();
}

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -36,10 +35,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -49,6 +45,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
/**
* A remote procecdure dispatcher for regionservers.
@ -222,7 +225,10 @@ public class RSProcedureDispatcher
private interface RemoteProcedureResolver {
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
}
/**
@ -231,22 +237,28 @@ public class RSProcedureDispatcher
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
* {@link RegionCloseOperation}s.
* @param serverName RegionServer to which the remote operations are sent
* @param remoteProcedures Remote procedures which are dispatched to the given server
* @param operations Remote procedures which are dispatched to the given server
* @param resolver Used to dispatch remote procedures to given server.
*/
public void splitAndResolveOperation(final ServerName serverName,
final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
RemoteProcedureResolver resolver) {
MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
buildAndGroupRequestByType(env, serverName, operations);
final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
if (!openOps.isEmpty()) {
resolver.dispatchOpenRequests(procedureEnv, openOps);
resolver.dispatchOpenRequests(env, openOps);
}
final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
if (!closeOps.isEmpty()) {
resolver.dispatchCloseRequests(procedureEnv, closeOps);
resolver.dispatchCloseRequests(env, closeOps);
}
List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
if (!refreshOps.isEmpty()) {
resolver.dispatchServerOperations(env, refreshOps);
}
if (!reqsByType.isEmpty()) {
@ -277,8 +289,7 @@ public class RSProcedureDispatcher
splitAndResolveOperation(getServerName(), remoteProcedures, this);
try {
final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
remoteCallCompleted(procedureEnv, response);
sendRequest(getServerName(), request.build());
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@ -302,6 +313,11 @@ public class RSProcedureDispatcher
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
}
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
@ -311,17 +327,8 @@ public class RSProcedureDispatcher
}
}
private void remoteCallCompleted(final MasterProcedureEnv env,
final ExecuteProceduresResponse response) {
/*
for (RemoteProcedure proc: operations) {
proc.remoteCallCompleted(env, getServerName(), response);
}*/
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RemoteProcedure proc: remoteProcedures) {
for (RemoteProcedure proc : remoteProcedures) {
proc.remoteCallFailed(env, getServerName(), e);
}
}
@ -362,8 +369,7 @@ public class RSProcedureDispatcher
buildOpenRegionRequest(procedureEnv, getServerName(), operations);
try {
OpenRegionResponse response = sendRequest(getServerName(), request);
remoteCallCompleted(procedureEnv, response);
sendRequest(getServerName(), request);
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@ -384,16 +390,6 @@ public class RSProcedureDispatcher
}
}
private void remoteCallCompleted(final MasterProcedureEnv env,
final OpenRegionResponse response) {
int index = 0;
for (RegionOpenOperation op: operations) {
OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
}
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RegionOpenOperation op: operations) {
op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
@ -443,7 +439,6 @@ public class RSProcedureDispatcher
private void remoteCallCompleted(final MasterProcedureEnv env,
final CloseRegionResponse response) {
operation.setClosed(response.getClosed());
operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@ -482,6 +477,11 @@ public class RSProcedureDispatcher
submitTask(new CloseRegionRemoteCall(serverName, op));
}
}
@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
throw new UnsupportedOperationException();
}
}
// ==========================================================================
@ -489,13 +489,28 @@ public class RSProcedureDispatcher
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
// - RegionOperation: open, close, flush, snapshot, ...
// ==========================================================================
/* Currently unused
public static abstract class ServerOperation extends RemoteOperation {
protected ServerOperation(final RemoteProcedure remoteProcedure) {
public static final class ServerOperation extends RemoteOperation {
private final long procId;
private final Class<?> rsProcClass;
private final byte[] rsProcData;
public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
byte[] rsProcData) {
super(remoteProcedure);
this.procId = procId;
this.rsProcClass = rsProcClass;
this.rsProcData = rsProcData;
}
public RemoteProcedureRequest buildRequest() {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
}
}
*/
public static abstract class RegionOperation extends RemoteOperation {
private final RegionInfo regionInfo;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
@InterfaceAudience.Private
public abstract class ModifyPeerProcedure
extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState>
implements PeerProcedureInterface {
private static final Log LOG = LogFactory.getLog(ModifyPeerProcedure.class);
protected String peerId;
protected ModifyPeerProcedure() {
}
protected ModifyPeerProcedure(String peerId) {
this.peerId = peerId;
}
@Override
public String getPeerId() {
return peerId;
}
/**
* Return {@code false} means that the operation is invalid and we should give up, otherwise
* {@code true}.
* <p>
* You need to call {@link #setFailure(String, Throwable)} to give the detail failure information.
*/
protected abstract boolean updatePeerStorage() throws IOException;
protected void postPeerModification() {
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case UPDATE_PEER_STORAGE:
try {
if (!updatePeerStorage()) {
assert isFailed() : "setFailure is not called";
return Flow.NO_MORE_STATE;
}
} catch (IOException e) {
LOG.warn("update peer storage failed, retry", e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn))
.toArray(RefreshPeerProcedure[]::new));
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
postPeerModification();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
return env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)
? LockState.LOCK_EVENT_WAIT
: LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(MasterProcedureEnv env) {
env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
}
@Override
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected PeerModificationState getState(int stateId) {
return PeerModificationState.forNumber(stateId);
}
@Override
protected int getStateId(PeerModificationState state) {
return state.getNumber();
}
@Override
protected PeerModificationState getInitialState() {
return PeerModificationState.UPDATE_PEER_STORAGE;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/**
* The callable executed at RS side to refresh the peer config/state.
* <p>
* TODO: only a dummy implementation for verifying the framework, will add implementation later.
*/
@InterfaceAudience.Private
public class RefreshPeerCallable implements RSProcedureCallable {
private HRegionServer rs;
private String peerId;
private Exception initError;
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close();
return null;
}
@Override
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId();
} catch (InvalidProtocolBufferException e) {
initError = e;
return;
}
}
@Override
public EventType getEventType() {
return EventType.RS_REFRESH_PEER;
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
@InterfaceAudience.Private
public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
private static final Log LOG = LogFactory.getLog(RefreshPeerProcedure.class);
private String peerId;
private PeerOperationType type;
private ServerName targetServer;
private boolean dispatched;
private ProcedureEvent<?> event;
private boolean succ;
public RefreshPeerProcedure() {
}
public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
this.peerId = peerId;
this.type = type;
this.targetServer = targetServer;
}
@Override
public String getPeerId() {
return peerId;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.REFRESH;
}
private static PeerModificationType toPeerModificationType(PeerOperationType type) {
switch (type) {
case ADD:
return PeerModificationType.ADD_PEER;
case REMOVE:
return PeerModificationType.REMOVE_PEER;
case ENABLE:
return PeerModificationType.ENABLE_PEER;
case DISABLE:
return PeerModificationType.DISABLE_PEER;
case UPDATE_CONFIG:
return PeerModificationType.UPDATE_PEER_CONFIG;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
private static PeerOperationType toPeerOperationType(PeerModificationType type) {
switch (type) {
case ADD_PEER:
return PeerOperationType.ADD;
case REMOVE_PEER:
return PeerOperationType.REMOVE;
case ENABLE_PEER:
return PeerOperationType.ENABLE;
case DISABLE_PEER:
return PeerOperationType.DISABLE;
case UPDATE_PEER_CONFIG:
return PeerOperationType.UPDATE_CONFIG;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
}
private void complete(MasterProcedureEnv env, boolean succ) {
if (event == null) {
LOG.warn("procedure event for " + getProcId() +
" is null, maybe the procedure is created when recovery", new Exception());
return;
}
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer +
(succ ? " suceeded" : " failed"));
this.succ = succ;
event.wake(env.getProcedureScheduler());
event = null;
}
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
IOException exception) {
complete(env, false);
}
@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
complete(env, true);
}
@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) {
complete(env, false);
}
@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (dispatched) {
if (succ) {
return null;
}
// retry
dispatched = false;
}
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type +
" to " + targetServer + ", this usually because the server is already dead," +
" give up and mark the procedure as complete");
return null;
}
dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(MasterProcedureEnv env) {
// TODO: no correctness problem if we just ignore this, implement later.
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class);
peerId = data.getPeerId();
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A general interface for a sub procedure runs at RS side.
*/
@InterfaceAudience.Private
public interface RSProcedureCallable extends Callable<Void> {
/**
* Initialize the callable
* @param parameter the parameter passed from master.
* @param rs the regionserver instance
*/
void init(byte[] parameter, HRegionServer rs);
/**
* Event type used to select thread pool.
*/
EventType getEventType();
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.http.HttpServlet;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.MemoryType;
@ -50,7 +47,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@ -117,6 +116,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -173,14 +174,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -206,12 +202,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@ -1932,6 +1936,8 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
}
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
@ -3725,4 +3731,60 @@ public class HRegionServer extends HasThread implements
return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
this.rpcServices, this.rpcServices);
}
public void executeProcedure(long procId, RSProcedureCallable callable) {
executorService.submit(new RSProcedureHandler(this, procId, callable));
}
public void reportProcedureDone(long procId, Throwable error) {
ReportProcedureDoneRequest.Builder builder =
ReportProcedureDoneRequest.newBuilder().setProcId(procId);
if (error != null) {
builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
.setError(Throwables.getStackTraceAsString(error));
} else {
builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
}
ReportProcedureDoneRequest request = builder.build();
int tries = 0;
long pauseTime = INIT_PAUSE_TIME_MS;
while (keepLooping()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
createRegionServerStatusStub();
continue;
}
rss.reportProcedureDone(null, request);
// Log if we had to retry else don't log unless TRACE. We want to
// know if were successful after an attempt showed in logs as failed.
if (tries > 0 || LOG.isTraceEnabled()) {
LOG.info("PROCEDURE REPORTED " + request);
}
return;
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
boolean pause =
ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
if (pause) {
// Do backoff else we flood the Master with requests.
pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
} else {
pauseTime = INIT_PAUSE_TIME_MS; // Reset.
}
LOG.info(
"Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
: " immediately."),
ioe);
if (pause) {
Threads.sleep(pauseTime);
}
tries++;
if (rssStub == rss) {
rssStub = null;
}
}
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionR
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -3434,23 +3435,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req : request.getOpenRegionList()) {
builder.addOpenRegion(openRegion(controller, req));
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req : request.getCloseRegionList()) {
builder.addCloseRegion(closeRegion(controller, req));
}
}
return builder.build();
}
@Override
public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
ClearRegionBlockCacheRequest request) {
@ -3468,4 +3452,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize());
return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
}
@Override
public ExecuteProceduresResponse executeProcedures(RpcController controller,
ExecuteProceduresRequest request) throws ServiceException {
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req : request.getOpenRegionList()) {
openRegion(controller, req);
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req : request.getCloseRegionList()) {
closeRegion(controller, req);
}
}
if (request.getProcCount() > 0) {
for (RemoteProcedureRequest req : request.getProcList()) {
RSProcedureCallable callable;
try {
callable =
Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class).newInstance();
} catch (Exception e) {
// here we just ignore the error as this should not happen and we do not provide a general
// way to report errors for all types of remote procedure. The procedure will hang at
// master side but after you solve the problem and restart master it will be executed
// again and pass.
LOG.warn("create procedure of type " + req.getProcClass() + " failed, give up", e);
continue;
}
callable.init(req.getProcData().toByteArray(), regionServer);
regionServer.executeProcedure(req.getProcId(), callable);
}
}
return ExecuteProceduresResponse.getDefaultInstance();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A event handler for running procedure.
*/
@InterfaceAudience.Private
public class RSProcedureHandler extends EventHandler {
private final long procId;
private final RSProcedureCallable callable;
public RSProcedureHandler(HRegionServer rs, long procId, RSProcedureCallable callable) {
super(rs, callable.getEventType());
this.procId = procId;
this.callable = callable;
}
@Override
public void process() {
Exception error = null;
try {
callable.call();
} catch (Exception e) {
error = e;
}
((HRegionServer) server).reportProcedureDone(procId, error);
}
}

View File

@ -531,26 +531,16 @@ public class TestAssignmentManager {
@Override
public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest request) throws IOException {
ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req: request.getOpenRegionList()) {
OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder();
for (RegionOpenInfo openReq: req.getOpenInfoList()) {
RegionOpeningState state = execOpenRegion(server, openReq);
if (state != null) {
resp.addOpeningState(state);
}
for (OpenRegionRequest req : request.getOpenRegionList()) {
for (RegionOpenInfo openReq : req.getOpenInfoList()) {
execOpenRegion(server, openReq);
}
builder.addOpenRegion(resp.build());
}
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req: request.getCloseRegionList()) {
CloseRegionResponse resp = execCloseRegion(server,
req.getRegion().getValue().toByteArray());
if (resp != null) {
builder.addCloseRegion(resp);
}
for (CloseRegionRequest req : request.getCloseRegionList()) {
execCloseRegion(server, req.getRegion().getValue().toByteArray());
}
}
return ExecuteProceduresResponse.newBuilder().build();

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
public DummyModifyPeerProcedure() {
}
public DummyModifyPeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
@Override
protected boolean updatePeerStorage() throws IOException {
return true;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDummyModifyPeerProcedure {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID;
private static Path DIR;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
PEER_ID = "testPeer";
DIR = new Path("/" + PEER_ID);
UTIL.getTestFileSystem().mkdirs(DIR);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
ProcedureExecutor<?> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID));
UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return executor.isFinished(procId);
}
});
Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName().toString())
.collect(Collectors.toCollection(HashSet::new));
for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) {
assertTrue(serverNames.remove(s.getPath().getName()));
}
assertTrue(serverNames.isEmpty());
}
}

View File

@ -30,6 +30,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;