HBASE-21217 Revisit the executeProcedure method for open/close region

This commit is contained in:
zhangduo 2018-09-24 21:49:27 +08:00
parent df5310fc1e
commit fa2888ebf0
12 changed files with 588 additions and 61 deletions

View File

@ -27,19 +27,22 @@ import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@ -64,12 +67,15 @@ public class ExecutorService {
// Name of the server hosting this executor service.
private final String servername;
private final ListeningScheduledExecutorService delayedSubmitTimer =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
/**
* Default constructor.
* @param servername Name of the hosting server.
*/
public ExecutorService(final String servername) {
super();
this.servername = servername;
}
@ -99,6 +105,7 @@ public class ExecutorService {
}
public void shutdown() {
this.delayedSubmitTimer.shutdownNow();
for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
List<Runnable> wasRunning =
entry.getValue().threadPoolExecutor.shutdownNow();
@ -146,6 +153,18 @@ public class ExecutorService {
}
}
// Submit the handler after the given delay. Used for retrying.
public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
future.addListener(() -> {
try {
future.get();
} catch (Exception e) {
LOG.error("Failed to submit the event handler {} to executor", eh, e);
}
}, MoreExecutors.directExecutor());
}
public Map<String, ExecutorStatus> getAllExecutorStatuses() {
Map<String, ExecutorStatus> ret = Maps.newHashMap();
for (Map.Entry<String, Executor> e : executorMap.entrySet()) {

View File

@ -55,7 +55,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -842,6 +840,12 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
// Will be overriden in test to inject customized AssignmentManager
@VisibleForTesting
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManager(master);
}
/**
* Finish initialization of HMaster after becoming the primary master.
* <p/>
@ -934,7 +938,7 @@ public class HMaster extends HRegionServer implements MasterServices {
checkUnsupportedProcedure(procsByType);
// Create Assignment Manager
this.assignmentManager = new AssignmentManager(this);
this.assignmentManager = createAssignmentManager(this);
this.assignmentManager.start();
// TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
// completed, it could still be in the procedure list. This is a bit strange but is another
@ -1333,11 +1337,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return (hfileCleanerFlag && logCleanerFlag);
}
@Override
public TableDescriptors getTableDescriptors() {
return this.tableDescriptors;
}
@Override
public ServerManager getServerManager() {
return this.serverManager;

View File

@ -2155,8 +2155,7 @@ public class HRegionServer extends HasThread implements
}
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context)
throws KeeperException, IOException {
public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
HRegion r = context.getRegion();
long masterSystemTime = context.getMasterSystemTime();
rpcServices.checkOpen();
@ -2170,16 +2169,16 @@ public class HRegionServer extends HasThread implements
long openSeqNum = r.getOpenSeqNum();
if (openSeqNum == HConstants.NO_SEQNUM) {
// If we opened a region, we should have read some sequence number from it.
LOG.error("No sequence number found when opening " +
r.getRegionInfo().getRegionNameAsString());
LOG.error(
"No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
openSeqNum = 0;
}
// Notify master
if (!reportRegionStateTransition(new RegionStateTransitionContext(
TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
throw new IOException("Failed to report opened region to master: "
+ r.getRegionInfo().getRegionNameAsString());
if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
openSeqNum, masterSystemTime, r.getRegionInfo()))) {
throw new IOException(
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
}
triggerFlushInPrimaryRegion(r);
@ -3577,6 +3576,7 @@ public class HRegionServer extends HasThread implements
/**
* @return Return table descriptors implementation.
*/
@Override
public TableDescriptors getTableDescriptors() {
return this.tableDescriptors;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.net.InetSocketAddress;
@ -118,9 +119,11 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@ -3590,6 +3593,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
}
private void executeOpenRegionProcedures(OpenRegionRequest request,
Map<TableName, TableDescriptor> tdCache) {
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
TableDescriptor tableDesc = tdCache.get(regionInfo.getTable());
if (tableDesc == null) {
try {
tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
} catch (IOException e) {
// Here we do not fail the whole method since we also need deal with other
// procedures, and we can not ignore this one, so we still schedule a
// AssignRegionHandler and it will report back to master if we still can not get the
// TableDescriptor.
LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
regionInfo.getTable(), e);
}
}
if (regionOpenInfo.getFavoredNodesCount() > 0) {
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
}
regionServer.executorService
.submit(AssignRegionHandler.create(regionServer, regionInfo, tableDesc, masterSystemTime));
}
}
private void executeCloseRegionProcedures(CloseRegionRequest request) {
String encodedName;
try {
encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
} catch (DoNotRetryIOException e) {
throw new UncheckedIOException("Should not happen", e);
}
ServerName destination =
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
: null;
regionServer.executorService
.submit(UnassignRegionHandler.create(regionServer, encodedName, false, destination));
}
private void executeProcedures(RemoteProcedureRequest request) {
RSProcedureCallable callable;
try {
callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
.getDeclaredConstructor().newInstance();
} catch (Exception e) {
regionServer.remoteProcedureComplete(request.getProcId(), e);
return;
}
callable.init(request.getProcData().toByteArray(), regionServer);
regionServer.executeProcedure(request.getProcId(), callable);
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public ExecuteProceduresResponse executeProcedures(RpcController controller,
@ -3598,28 +3655,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
if (request.getOpenRegionCount() > 0) {
for (OpenRegionRequest req : request.getOpenRegionList()) {
openRegion(controller, req);
}
// Avoid reading from the TableDescritor every time(usually it will read from the file
// system)
Map<TableName, TableDescriptor> tdCache = new HashMap<>();
request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
}
if (request.getCloseRegionCount() > 0) {
for (CloseRegionRequest req : request.getCloseRegionList()) {
closeRegion(controller, req);
}
request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
}
if (request.getProcCount() > 0) {
for (RemoteProcedureRequest req : request.getProcList()) {
RSProcedureCallable callable;
try {
callable = Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class)
.getDeclaredConstructor().newInstance();
} catch (Exception e) {
regionServer.remoteProcedureComplete(req.getProcId(), e);
continue;
}
callable.init(req.getProcData().toByteArray(), regionServer);
regionServer.executeProcedure(req.getProcId(), callable);
}
request.getProcList().forEach(this::executeProcedures);
}
regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
return ExecuteProceduresResponse.getDefaultInstance();

View File

@ -18,12 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -34,12 +35,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import com.google.protobuf.Service;
/**
* A curated subset of services provided by {@link HRegionServer}.
* For use internally only. Passed to Managers, Services and Chores so can pass less-than-a
@ -111,14 +109,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
}
/**
* Tasks to perform after region open to complete deploy of region on
* regionserver
*
* Tasks to perform after region open to complete deploy of region on regionserver
* @param context the context
* @throws KeeperException
* @throws IOException
*/
void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException;
void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException;
class RegionStateTransitionContext {
private final TransitionCode code;
@ -239,4 +233,9 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
* @return True if cluster is up; false if cluster is not up (we are shutting down).
*/
boolean isClusterUp();
/**
* @return Return table descriptors implementation.
*/
TableDescriptors getTableDescriptors();
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* Handles opening of a region on a region server.
* <p/>
* Just done the same thing with the old {@link OpenRegionHandler}, with some modifications on
* fencing and retrying. But we need to keep the {@link OpenRegionHandler} as is to keep compatible
* with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade.
*/
@InterfaceAudience.Private
public class AssignRegionHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(AssignRegionHandler.class);
private final RegionInfo regionInfo;
private final TableDescriptor tableDesc;
private final long masterSystemTime;
public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo,
@Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) {
super(server, eventType);
this.regionInfo = regionInfo;
this.tableDesc = tableDesc;
this.masterSystemTime = masterSystemTime;
}
private RegionServerServices getServer() {
return (RegionServerServices) server;
}
private void cleanUpAndReportFailure(IOException error) throws IOException {
LOG.warn("Failed to open region {}, will report to master", regionInfo.getRegionNameAsString(),
error);
RegionServerServices rs = getServer();
rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN,
HConstants.NO_SEQNUM, masterSystemTime, regionInfo))) {
throw new IOException(
"Failed to report failed open to master: " + regionInfo.getRegionNameAsString());
}
}
@Override
public void process() throws IOException {
RegionServerServices rs = getServer();
String encodedName = regionInfo.getEncodedName();
byte[] encodedNameBytes = regionInfo.getEncodedNameAsBytes();
String regionName = regionInfo.getRegionNameAsString();
Region onlineRegion = rs.getRegion(encodedName);
if (onlineRegion != null) {
LOG.warn("Received OPEN for the region:{}, which is already online", regionName);
// Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
// For normal case, it could happen that the rpc call to schedule this handler is succeeded,
// but before returning to master the connection is broken. And when master tries again, we
// have already finished the opening. For this case we do not need to call
// reportRegionStateTransition any more.
return;
}
LOG.info("Open {}", regionName);
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
if (previous != null) {
if (previous) {
// The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" +
" - ignoring this new request for this region.", regionName);
} else {
// The region is closing. This is possible as we will update the region state to CLOSED when
// calling reportRegionStateTransition, so the HMaster will think the region is offline,
// before we actually close the region, as reportRegionStateTransition is part of the
// closing process.
LOG.info("Receiving OPEN for the region:{}, which we are trying to close, try again later",
regionName);
// TODO: backoff
rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS);
}
return;
}
HRegion region;
try {
TableDescriptor htd =
tableDesc != null ? tableDesc : rs.getTableDescriptors().get(regionInfo.getTable());
if (htd == null) {
throw new IOException("Missing table descriptor for " + regionName);
}
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
// opening can not be interrupted by a close request any more.
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
rs, null);
} catch (IOException e) {
cleanUpAndReportFailure(e);
return;
}
rs.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
rs.addRegion(region);
LOG.info("Opened {}", regionName);
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
if (current == null) {
// Should NEVER happen, but let's be paranoid.
LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}",
regionName);
} else if (!current) {
// Should NEVER happen, but let's be paranoid.
LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName);
}
}
@Override
protected void handleException(Throwable t) {
LOG.warn("Fatal error occured while opening region {}, aborting...",
regionInfo.getRegionNameAsString(), t);
getServer().abort(
"Failed to open region " + regionInfo.getRegionNameAsString() + " and can not recover", t);
}
public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo,
TableDescriptor tableDesc, long masterSystemTime) {
EventType eventType;
if (regionInfo.isMetaRegion()) {
eventType = EventType.M_RS_CLOSE_META;
} else if (regionInfo.getTable().isSystemTable() ||
(tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) {
eventType = EventType.M_RS_OPEN_PRIORITY_REGION;
} else {
eventType = EventType.M_RS_OPEN_REGION;
}
return new AssignRegionHandler(server, regionInfo, tableDesc, masterSystemTime, eventType);
}
}

View File

@ -36,6 +36,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
/**
* Handles closing of a region on a region server.
* <p/>
* Now for regular close region request, we will use {@link UnassignRegionHandler} instead. But when
* shutting down the region server, will also close regions and the related methods still use this
* class so we keep it here.
* @see UnassignRegionHandler
*/
@InterfaceAudience.Private
public class CloseRegionHandler extends EventHandler {

View File

@ -41,7 +41,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* Handles opening of a region on a region server.
* <p>
* This is executed after receiving an OPEN RPC from the master or client.
* @deprecated Keep it here only for compatible
* @see AssignRegionHandler
*/
@Deprecated
@InterfaceAudience.Private
public class OpenRegionHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpenRegionHandler.class);
@ -172,7 +175,7 @@ public class OpenRegionHandler extends EventHandler {
* state meantime so master doesn't timeout our region-in-transition.
* Caller must cleanup region if this fails.
*/
boolean updateMeta(final HRegion r, long masterSystemTime) {
private boolean updateMeta(final HRegion r, long masterSystemTime) {
if (this.server.isStopped() || this.rsServices.isStopping()) {
return false;
}
@ -275,7 +278,7 @@ public class OpenRegionHandler extends EventHandler {
/**
* @return Instance of HRegion if successful open else null.
*/
HRegion openRegion() {
private HRegion openRegion() {
HRegion region = null;
try {
// Instantiate the region. This also periodically tickles OPENING
@ -304,7 +307,7 @@ public class OpenRegionHandler extends EventHandler {
return region;
}
void cleanupFailedOpen(final HRegion region) throws IOException {
private void cleanupFailedOpen(final HRegion region) throws IOException {
if (region != null) {
this.rsServices.removeRegion(region, null);
region.close();

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* Handles closing of a region on a region server.
* <p/>
* Just done the same thing with the old {@link CloseRegionHandler}, with some modifications on
* fencing and retrying. But we need to keep the {@link CloseRegionHandler} as is to keep compatible
* with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade.
*/
@InterfaceAudience.Private
public class UnassignRegionHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(UnassignRegionHandler.class);
private final String encodedName;
// If true, the hosting server is aborting. Region close process is different
// when we are aborting.
private final boolean abort;
private final ServerName destination;
public UnassignRegionHandler(RegionServerServices server, String encodedName, boolean abort,
@Nullable ServerName destination, EventType eventType) {
super(server, eventType);
this.encodedName = encodedName;
this.abort = abort;
this.destination = destination;
}
private RegionServerServices getServer() {
return (RegionServerServices) server;
}
@Override
public void process() throws IOException {
RegionServerServices rs = getServer();
byte[] encodedNameBytes = Bytes.toBytes(encodedName);
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.FALSE);
if (previous != null) {
if (previous) {
// This could happen as we will update the region state to OPEN when calling
// reportRegionStateTransition, so the HMaster will think the region is online, before we
// actually open the region, as reportRegionStateTransition is part of the opening process.
LOG.warn("Received CLOSE for the region: {}, which we are already " +
"trying to OPEN. try again later.", encodedName);
// TODO: backoff
rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS);
} else {
LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," +
" but not completed yet", encodedName);
}
return;
}
HRegion region = (HRegion) rs.getRegion(encodedName);
if (region == null) {
LOG.debug(
"Received CLOSE for a region {} which is not online, and we're not opening/closing.",
encodedName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
}
String regionName = region.getRegionInfo().getEncodedName();
LOG.info("Close {}", regionName);
if (region.getCoprocessorHost() != null) {
// XXX: The behavior is a bit broken. At master side there is no FAILED_CLOSE state, so if
// there are exception thrown from the CP, we can not report the error to master, and if here
// we just return without calling reportRegionStateTransition, the TRSP at master side will
// hang there for ever. So here if the CP throws an exception out, the only way is to abort
// the RS...
region.getCoprocessorHost().preClose(abort);
}
if (region.close(abort) == null) {
// XXX: Is this still possible? The old comment says about split, but now split is done at
// master side, so...
LOG.warn("Can't close region {}, was already closed during close()", regionName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
}
rs.removeRegion(region, destination);
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
HConstants.NO_SEQNUM, -1, region.getRegionInfo()))) {
throw new IOException("Failed to report close to master: " + regionName);
}
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
LOG.info("Closed {}", regionName);
}
@Override
protected void handleException(Throwable t) {
LOG.warn("Fatal error occured while closing region {}, aborting...", encodedName, t);
getServer().abort("Failed to close region " + encodedName + " and can not recover", t);
}
public static UnassignRegionHandler create(RegionServerServices server, String encodedName,
boolean abort, @Nullable ServerName destination) {
// Just try our best to determine whether it is for closing meta. It is not the end of the world
// if we put the handler into a wrong executor.
Region region = server.getRegion(encodedName);
EventType eventType =
region != null && region.getRegionInfo().isMetaRegion() ? EventType.M_RS_CLOSE_META
: EventType.M_RS_CLOSE_REGION;
return new UnassignRegionHandler(server, encodedName, abort, destination, eventType);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
import com.google.protobuf.Service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
@ -25,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.ClusterConnection;
@ -53,12 +53,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/**
* Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b
@ -121,8 +119,7 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
addRegion(context.getRegion());
}
@ -339,4 +336,9 @@ public class MockRegionServerServices implements RegionServerServices {
public boolean isClusterUp() {
return true;
}
@Override
public TableDescriptors getTableDescriptors() {
return null;
}
}

View File

@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
@ -66,10 +66,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
@ -334,8 +334,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
}
@Override
@ -691,4 +690,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public boolean isClusterUp() {
return true;
}
@Override
public TableDescriptors getTableDescriptors() {
return null;
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
@Category({ MasterTests.class, MediumTests.class })
public class TestCloseAnOpeningRegion {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCloseAnOpeningRegion.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("race");
private static byte[] CF = Bytes.toBytes("cf");
private static volatile CountDownLatch ARRIVE;
private static volatile CountDownLatch RESUME;
public static final class MockHMaster extends HMaster {
public MockHMaster(Configuration conf) throws IOException, KeeperException {
super(conf);
}
@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManager(master) {
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req);
TransitionCode code = req.getTransition(0).getTransitionCode();
if (code == TransitionCode.OPENED && ARRIVE != null) {
ARRIVE.countDown();
try {
RESUME.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return resp;
}
};
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 60000);
UTIL.startMiniCluster(
StartMiniClusterOption.builder().numRegionServers(2).masterClass(MockHMaster.class).build());
UTIL.createTable(TABLE_NAME, CF);
UTIL.getAdmin().balancerSwitch(false, true);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws IOException, InterruptedException {
ARRIVE = new CountDownLatch(1);
RESUME = new CountDownLatch(1);
RegionInfo region = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
HRegionServer src = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
HRegionServer dst = UTIL.getOtherRegionServer(src);
Thread move0 = new Thread(() -> {
try {
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(dst.getServerName().getServerName()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
move0.start();
ARRIVE.await();
Thread move1 = new Thread(() -> {
try {
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(src.getServerName().getServerName()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
move1.start();
// No simple way to determine when it is safe to go on and produce the race so let's sleep for a
// well...
Thread.sleep(10000);
RESUME.countDown();
move0.join();
move1.join();
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
// make sure that we can write to the table, which means the region is online
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(0)));
}
}
}