HBASE-21217 Revisit the executeProcedure method for open/close region
This commit is contained in:
parent
c686b535c2
commit
8eaaa63114
|
@ -27,19 +27,22 @@ import java.util.Map.Entry;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
|
@ -64,12 +67,15 @@ public class ExecutorService {
|
|||
// Name of the server hosting this executor service.
|
||||
private final String servername;
|
||||
|
||||
private final ListeningScheduledExecutorService delayedSubmitTimer =
|
||||
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
|
||||
.setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
* @param servername Name of the hosting server.
|
||||
*/
|
||||
public ExecutorService(final String servername) {
|
||||
super();
|
||||
this.servername = servername;
|
||||
}
|
||||
|
||||
|
@ -99,6 +105,7 @@ public class ExecutorService {
|
|||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.delayedSubmitTimer.shutdownNow();
|
||||
for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
|
||||
List<Runnable> wasRunning =
|
||||
entry.getValue().threadPoolExecutor.shutdownNow();
|
||||
|
@ -146,6 +153,18 @@ public class ExecutorService {
|
|||
}
|
||||
}
|
||||
|
||||
// Submit the handler after the given delay. Used for retrying.
|
||||
public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
|
||||
ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
|
||||
future.addListener(() -> {
|
||||
try {
|
||||
future.get();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to submit the event handler {} to executor", eh, e);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
public Map<String, ExecutorStatus> getAllExecutorStatuses() {
|
||||
Map<String, ExecutorStatus> ret = Maps.newHashMap();
|
||||
for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
|
||||
|
|
|
@ -55,7 +55,6 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
|
|||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
|
@ -848,6 +846,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
|
||||
// Will be overriden in test to inject customized AssignmentManager
|
||||
@VisibleForTesting
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AssignmentManager(master);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish initialization of HMaster after becoming the primary master.
|
||||
* <p/>
|
||||
|
@ -940,7 +944,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
checkUnsupportedProcedure(procsByType);
|
||||
|
||||
// Create Assignment Manager
|
||||
this.assignmentManager = new AssignmentManager(this);
|
||||
this.assignmentManager = createAssignmentManager(this);
|
||||
this.assignmentManager.start();
|
||||
// TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
|
||||
// completed, it could still be in the procedure list. This is a bit strange but is another
|
||||
|
@ -1347,11 +1351,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return (hfileCleanerFlag && logCleanerFlag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return this.tableDescriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerManager getServerManager() {
|
||||
return this.serverManager;
|
||||
|
|
|
@ -2180,8 +2180,7 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postOpenDeployTasks(final PostOpenDeployContext context)
|
||||
throws KeeperException, IOException {
|
||||
public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
|
||||
HRegion r = context.getRegion();
|
||||
long masterSystemTime = context.getMasterSystemTime();
|
||||
rpcServices.checkOpen();
|
||||
|
@ -2195,16 +2194,16 @@ public class HRegionServer extends HasThread implements
|
|||
long openSeqNum = r.getOpenSeqNum();
|
||||
if (openSeqNum == HConstants.NO_SEQNUM) {
|
||||
// If we opened a region, we should have read some sequence number from it.
|
||||
LOG.error("No sequence number found when opening " +
|
||||
r.getRegionInfo().getRegionNameAsString());
|
||||
LOG.error(
|
||||
"No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
|
||||
openSeqNum = 0;
|
||||
}
|
||||
|
||||
// Notify master
|
||||
if (!reportRegionStateTransition(new RegionStateTransitionContext(
|
||||
TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
|
||||
throw new IOException("Failed to report opened region to master: "
|
||||
+ r.getRegionInfo().getRegionNameAsString());
|
||||
if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
|
||||
openSeqNum, masterSystemTime, r.getRegionInfo()))) {
|
||||
throw new IOException(
|
||||
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
|
||||
triggerFlushInPrimaryRegion(r);
|
||||
|
@ -3602,6 +3601,7 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* @return Return table descriptors implementation.
|
||||
*/
|
||||
@Override
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return this.tableDescriptors;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.BindException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -118,9 +119,11 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
|||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
|
||||
|
@ -3682,6 +3685,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
|
||||
}
|
||||
|
||||
private void executeOpenRegionProcedures(OpenRegionRequest request,
|
||||
Map<TableName, TableDescriptor> tdCache) {
|
||||
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
|
||||
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
|
||||
RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
|
||||
TableDescriptor tableDesc = tdCache.get(regionInfo.getTable());
|
||||
if (tableDesc == null) {
|
||||
try {
|
||||
tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
|
||||
} catch (IOException e) {
|
||||
// Here we do not fail the whole method since we also need deal with other
|
||||
// procedures, and we can not ignore this one, so we still schedule a
|
||||
// AssignRegionHandler and it will report back to master if we still can not get the
|
||||
// TableDescriptor.
|
||||
LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
|
||||
regionInfo.getTable(), e);
|
||||
}
|
||||
}
|
||||
if (regionOpenInfo.getFavoredNodesCount() > 0) {
|
||||
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
|
||||
regionOpenInfo.getFavoredNodesList());
|
||||
}
|
||||
regionServer.executorService
|
||||
.submit(AssignRegionHandler.create(regionServer, regionInfo, tableDesc, masterSystemTime));
|
||||
}
|
||||
}
|
||||
|
||||
private void executeCloseRegionProcedures(CloseRegionRequest request) {
|
||||
String encodedName;
|
||||
try {
|
||||
encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
|
||||
} catch (DoNotRetryIOException e) {
|
||||
throw new UncheckedIOException("Should not happen", e);
|
||||
}
|
||||
ServerName destination =
|
||||
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
|
||||
: null;
|
||||
regionServer.executorService
|
||||
.submit(UnassignRegionHandler.create(regionServer, encodedName, false, destination));
|
||||
}
|
||||
|
||||
private void executeProcedures(RemoteProcedureRequest request) {
|
||||
RSProcedureCallable callable;
|
||||
try {
|
||||
callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
|
||||
.getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
regionServer.remoteProcedureComplete(request.getProcId(), e);
|
||||
return;
|
||||
}
|
||||
callable.init(request.getProcData().toByteArray(), regionServer);
|
||||
regionServer.executeProcedure(request.getProcId(), callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@QosPriority(priority = HConstants.ADMIN_QOS)
|
||||
public ExecuteProceduresResponse executeProcedures(RpcController controller,
|
||||
|
@ -3690,28 +3747,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
checkOpen();
|
||||
regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
|
||||
if (request.getOpenRegionCount() > 0) {
|
||||
for (OpenRegionRequest req : request.getOpenRegionList()) {
|
||||
openRegion(controller, req);
|
||||
}
|
||||
// Avoid reading from the TableDescritor every time(usually it will read from the file
|
||||
// system)
|
||||
Map<TableName, TableDescriptor> tdCache = new HashMap<>();
|
||||
request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
|
||||
}
|
||||
if (request.getCloseRegionCount() > 0) {
|
||||
for (CloseRegionRequest req : request.getCloseRegionList()) {
|
||||
closeRegion(controller, req);
|
||||
}
|
||||
request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
|
||||
}
|
||||
if (request.getProcCount() > 0) {
|
||||
for (RemoteProcedureRequest req : request.getProcList()) {
|
||||
RSProcedureCallable callable;
|
||||
try {
|
||||
callable = Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class)
|
||||
.getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
regionServer.remoteProcedureComplete(req.getProcId(), e);
|
||||
continue;
|
||||
}
|
||||
callable.init(req.getProcData().toByteArray(), regionServer);
|
||||
regionServer.executeProcedure(req.getProcId(), callable);
|
||||
}
|
||||
request.getProcList().forEach(this::executeProcedures);
|
||||
}
|
||||
regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
|
||||
return ExecuteProceduresResponse.getDefaultInstance();
|
||||
|
|
|
@ -18,14 +18,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.locking.EntityLock;
|
||||
|
@ -38,12 +39,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
|||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* A curated subset of services provided by {@link HRegionServer}.
|
||||
* For use internally only. Passed to Managers, Services and Chores so can pass less-than-a
|
||||
|
@ -115,14 +113,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
|||
}
|
||||
|
||||
/**
|
||||
* Tasks to perform after region open to complete deploy of region on
|
||||
* regionserver
|
||||
*
|
||||
* Tasks to perform after region open to complete deploy of region on regionserver
|
||||
* @param context the context
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException;
|
||||
void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException;
|
||||
|
||||
class RegionStateTransitionContext {
|
||||
private final TransitionCode code;
|
||||
|
@ -267,4 +261,9 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
|||
* @return Return the object that implements the replication source executorService.
|
||||
*/
|
||||
ReplicationSourceService getReplicationSourceService();
|
||||
|
||||
/**
|
||||
* @return Return table descriptors implementation.
|
||||
*/
|
||||
TableDescriptors getTableDescriptors();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.handler;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
* Handles opening of a region on a region server.
|
||||
* <p/>
|
||||
* Just done the same thing with the old {@link OpenRegionHandler}, with some modifications on
|
||||
* fencing and retrying. But we need to keep the {@link OpenRegionHandler} as is to keep compatible
|
||||
* with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AssignRegionHandler extends EventHandler {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AssignRegionHandler.class);
|
||||
|
||||
private final RegionInfo regionInfo;
|
||||
|
||||
private final TableDescriptor tableDesc;
|
||||
|
||||
private final long masterSystemTime;
|
||||
|
||||
public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo,
|
||||
@Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) {
|
||||
super(server, eventType);
|
||||
this.regionInfo = regionInfo;
|
||||
this.tableDesc = tableDesc;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
}
|
||||
|
||||
private RegionServerServices getServer() {
|
||||
return (RegionServerServices) server;
|
||||
}
|
||||
|
||||
private void cleanUpAndReportFailure(IOException error) throws IOException {
|
||||
LOG.warn("Failed to open region {}, will report to master", regionInfo.getRegionNameAsString(),
|
||||
error);
|
||||
RegionServerServices rs = getServer();
|
||||
rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN,
|
||||
HConstants.NO_SEQNUM, masterSystemTime, regionInfo))) {
|
||||
throw new IOException(
|
||||
"Failed to report failed open to master: " + regionInfo.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
RegionServerServices rs = getServer();
|
||||
String encodedName = regionInfo.getEncodedName();
|
||||
byte[] encodedNameBytes = regionInfo.getEncodedNameAsBytes();
|
||||
String regionName = regionInfo.getRegionNameAsString();
|
||||
Region onlineRegion = rs.getRegion(encodedName);
|
||||
if (onlineRegion != null) {
|
||||
LOG.warn("Received OPEN for the region:{}, which is already online", regionName);
|
||||
// Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
|
||||
// For normal case, it could happen that the rpc call to schedule this handler is succeeded,
|
||||
// but before returning to master the connection is broken. And when master tries again, we
|
||||
// have already finished the opening. For this case we do not need to call
|
||||
// reportRegionStateTransition any more.
|
||||
return;
|
||||
}
|
||||
LOG.info("Open {}", regionName);
|
||||
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
|
||||
if (previous != null) {
|
||||
if (previous) {
|
||||
// The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
|
||||
LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" +
|
||||
" - ignoring this new request for this region.", regionName);
|
||||
} else {
|
||||
// The region is closing. This is possible as we will update the region state to CLOSED when
|
||||
// calling reportRegionStateTransition, so the HMaster will think the region is offline,
|
||||
// before we actually close the region, as reportRegionStateTransition is part of the
|
||||
// closing process.
|
||||
LOG.info("Receiving OPEN for the region:{}, which we are trying to close, try again later",
|
||||
regionName);
|
||||
// TODO: backoff
|
||||
rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
return;
|
||||
}
|
||||
HRegion region;
|
||||
try {
|
||||
TableDescriptor htd =
|
||||
tableDesc != null ? tableDesc : rs.getTableDescriptors().get(regionInfo.getTable());
|
||||
if (htd == null) {
|
||||
throw new IOException("Missing table descriptor for " + regionName);
|
||||
}
|
||||
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
|
||||
// opening can not be interrupted by a close request any more.
|
||||
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
|
||||
rs, null);
|
||||
} catch (IOException e) {
|
||||
cleanUpAndReportFailure(e);
|
||||
return;
|
||||
}
|
||||
rs.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
|
||||
rs.addRegion(region);
|
||||
LOG.info("Opened {}", regionName);
|
||||
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
|
||||
if (current == null) {
|
||||
// Should NEVER happen, but let's be paranoid.
|
||||
LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}",
|
||||
regionName);
|
||||
} else if (!current) {
|
||||
// Should NEVER happen, but let's be paranoid.
|
||||
LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleException(Throwable t) {
|
||||
LOG.warn("Fatal error occured while opening region {}, aborting...",
|
||||
regionInfo.getRegionNameAsString(), t);
|
||||
getServer().abort(
|
||||
"Failed to open region " + regionInfo.getRegionNameAsString() + " and can not recover", t);
|
||||
}
|
||||
|
||||
public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo,
|
||||
TableDescriptor tableDesc, long masterSystemTime) {
|
||||
EventType eventType;
|
||||
if (regionInfo.isMetaRegion()) {
|
||||
eventType = EventType.M_RS_CLOSE_META;
|
||||
} else if (regionInfo.getTable().isSystemTable() ||
|
||||
(tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) {
|
||||
eventType = EventType.M_RS_OPEN_PRIORITY_REGION;
|
||||
} else {
|
||||
eventType = EventType.M_RS_OPEN_REGION;
|
||||
}
|
||||
return new AssignRegionHandler(server, regionInfo, tableDesc, masterSystemTime, eventType);
|
||||
}
|
||||
}
|
|
@ -36,6 +36,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
|||
|
||||
/**
|
||||
* Handles closing of a region on a region server.
|
||||
* <p/>
|
||||
* Now for regular close region request, we will use {@link UnassignRegionHandler} instead. But when
|
||||
* shutting down the region server, will also close regions and the related methods still use this
|
||||
* class so we keep it here.
|
||||
* @see UnassignRegionHandler
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CloseRegionHandler extends EventHandler {
|
||||
|
|
|
@ -41,7 +41,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
|||
* Handles opening of a region on a region server.
|
||||
* <p>
|
||||
* This is executed after receiving an OPEN RPC from the master or client.
|
||||
* @deprecated Keep it here only for compatible
|
||||
* @see AssignRegionHandler
|
||||
*/
|
||||
@Deprecated
|
||||
@InterfaceAudience.Private
|
||||
public class OpenRegionHandler extends EventHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(OpenRegionHandler.class);
|
||||
|
@ -172,7 +175,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
* state meantime so master doesn't timeout our region-in-transition.
|
||||
* Caller must cleanup region if this fails.
|
||||
*/
|
||||
boolean updateMeta(final HRegion r, long masterSystemTime) {
|
||||
private boolean updateMeta(final HRegion r, long masterSystemTime) {
|
||||
if (this.server.isStopped() || this.rsServices.isStopping()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -275,7 +278,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
/**
|
||||
* @return Instance of HRegion if successful open else null.
|
||||
*/
|
||||
HRegion openRegion() {
|
||||
private HRegion openRegion() {
|
||||
HRegion region = null;
|
||||
try {
|
||||
// Instantiate the region. This also periodically tickles OPENING
|
||||
|
@ -304,7 +307,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
return region;
|
||||
}
|
||||
|
||||
void cleanupFailedOpen(final HRegion region) throws IOException {
|
||||
private void cleanupFailedOpen(final HRegion region) throws IOException {
|
||||
if (region != null) {
|
||||
this.rsServices.removeRegion(region, null);
|
||||
region.close();
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.handler;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
* Handles closing of a region on a region server.
|
||||
* <p/>
|
||||
* Just done the same thing with the old {@link CloseRegionHandler}, with some modifications on
|
||||
* fencing and retrying. But we need to keep the {@link CloseRegionHandler} as is to keep compatible
|
||||
* with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class UnassignRegionHandler extends EventHandler {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UnassignRegionHandler.class);
|
||||
|
||||
private final String encodedName;
|
||||
// If true, the hosting server is aborting. Region close process is different
|
||||
// when we are aborting.
|
||||
private final boolean abort;
|
||||
|
||||
private final ServerName destination;
|
||||
|
||||
public UnassignRegionHandler(RegionServerServices server, String encodedName, boolean abort,
|
||||
@Nullable ServerName destination, EventType eventType) {
|
||||
super(server, eventType);
|
||||
this.encodedName = encodedName;
|
||||
this.abort = abort;
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
private RegionServerServices getServer() {
|
||||
return (RegionServerServices) server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
RegionServerServices rs = getServer();
|
||||
byte[] encodedNameBytes = Bytes.toBytes(encodedName);
|
||||
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.FALSE);
|
||||
if (previous != null) {
|
||||
if (previous) {
|
||||
// This could happen as we will update the region state to OPEN when calling
|
||||
// reportRegionStateTransition, so the HMaster will think the region is online, before we
|
||||
// actually open the region, as reportRegionStateTransition is part of the opening process.
|
||||
LOG.warn("Received CLOSE for the region: {}, which we are already " +
|
||||
"trying to OPEN. try again later.", encodedName);
|
||||
// TODO: backoff
|
||||
rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS);
|
||||
} else {
|
||||
LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," +
|
||||
" but not completed yet", encodedName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
HRegion region = (HRegion) rs.getRegion(encodedName);
|
||||
if (region == null) {
|
||||
LOG.debug(
|
||||
"Received CLOSE for a region {} which is not online, and we're not opening/closing.",
|
||||
encodedName);
|
||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||
return;
|
||||
}
|
||||
String regionName = region.getRegionInfo().getEncodedName();
|
||||
LOG.info("Close {}", regionName);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
// XXX: The behavior is a bit broken. At master side there is no FAILED_CLOSE state, so if
|
||||
// there are exception thrown from the CP, we can not report the error to master, and if here
|
||||
// we just return without calling reportRegionStateTransition, the TRSP at master side will
|
||||
// hang there for ever. So here if the CP throws an exception out, the only way is to abort
|
||||
// the RS...
|
||||
region.getCoprocessorHost().preClose(abort);
|
||||
}
|
||||
if (region.close(abort) == null) {
|
||||
// XXX: Is this still possible? The old comment says about split, but now split is done at
|
||||
// master side, so...
|
||||
LOG.warn("Can't close region {}, was already closed during close()", regionName);
|
||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||
return;
|
||||
}
|
||||
rs.removeRegion(region, destination);
|
||||
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
|
||||
HConstants.NO_SEQNUM, -1, region.getRegionInfo()))) {
|
||||
throw new IOException("Failed to report close to master: " + regionName);
|
||||
}
|
||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||
LOG.info("Closed {}", regionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleException(Throwable t) {
|
||||
LOG.warn("Fatal error occured while closing region {}, aborting...", encodedName, t);
|
||||
getServer().abort("Failed to close region " + encodedName + " and can not recover", t);
|
||||
}
|
||||
|
||||
public static UnassignRegionHandler create(RegionServerServices server, String encodedName,
|
||||
boolean abort, @Nullable ServerName destination) {
|
||||
// Just try our best to determine whether it is for closing meta. It is not the end of the world
|
||||
// if we put the handler into a wrong executor.
|
||||
Region region = server.getRegion(encodedName);
|
||||
EventType eventType =
|
||||
region != null && region.getRegionInfo().isMetaRegion() ? EventType.M_RS_CLOSE_META
|
||||
: EventType.M_RS_CLOSE_REGION;
|
||||
return new UnassignRegionHandler(server, encodedName, abort, destination, eventType);
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
|
@ -27,7 +28,6 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
|
@ -57,12 +57,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b
|
||||
|
@ -125,8 +123,7 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
|
||||
IOException {
|
||||
public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
|
||||
addRegion(context.getRegion());
|
||||
}
|
||||
|
||||
|
@ -359,4 +356,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
public ReplicationSourceService getReplicationSourceService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Map.Entry;
|
|||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
|
@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
|
@ -70,10 +70,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
||||
|
@ -142,7 +142,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
|
|||
* store that the get pulls from.
|
||||
*/
|
||||
class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
|
||||
ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||
ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||
private final ServerName sn;
|
||||
private final ZKWatcher zkw;
|
||||
private final Configuration conf;
|
||||
|
@ -338,8 +338,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
|
||||
IOException {
|
||||
public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -710,4 +709,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
public ReplicationSourceService getReplicationSourceService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestCloseAnOpeningRegion {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCloseAnOpeningRegion.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("race");
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
private static volatile CountDownLatch ARRIVE;
|
||||
|
||||
private static volatile CountDownLatch RESUME;
|
||||
|
||||
public static final class MockHMaster extends HMaster {
|
||||
|
||||
public MockHMaster(Configuration conf) throws IOException, KeeperException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AssignmentManager(master) {
|
||||
|
||||
@Override
|
||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req);
|
||||
TransitionCode code = req.getTransition(0).getTransitionCode();
|
||||
if (code == TransitionCode.OPENED && ARRIVE != null) {
|
||||
ARRIVE.countDown();
|
||||
try {
|
||||
RESUME.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 60000);
|
||||
UTIL.startMiniCluster(
|
||||
StartMiniClusterOption.builder().numRegionServers(2).masterClass(MockHMaster.class).build());
|
||||
UTIL.createTable(TABLE_NAME, CF);
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException {
|
||||
ARRIVE = new CountDownLatch(1);
|
||||
RESUME = new CountDownLatch(1);
|
||||
RegionInfo region = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
|
||||
HRegionServer src = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
HRegionServer dst = UTIL.getOtherRegionServer(src);
|
||||
Thread move0 = new Thread(() -> {
|
||||
try {
|
||||
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(dst.getServerName().getServerName()));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
move0.start();
|
||||
ARRIVE.await();
|
||||
Thread move1 = new Thread(() -> {
|
||||
try {
|
||||
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(src.getServerName().getServerName()));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
move1.start();
|
||||
// No simple way to determine when it is safe to go on and produce the race so let's sleep for a
|
||||
// well...
|
||||
Thread.sleep(10000);
|
||||
RESUME.countDown();
|
||||
move0.join();
|
||||
move1.join();
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
// make sure that we can write to the table, which means the region is online
|
||||
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(0)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue