diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 342d44144ae..4f8909e62b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -27,19 +27,22 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -64,12 +67,15 @@ public class ExecutorService { // Name of the server hosting this executor service. private final String servername; + private final ListeningScheduledExecutorService delayedSubmitTimer = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build())); + /** * Default constructor. * @param servername Name of the hosting server. */ public ExecutorService(final String servername) { - super(); this.servername = servername; } @@ -99,6 +105,7 @@ public class ExecutorService { } public void shutdown() { + this.delayedSubmitTimer.shutdownNow(); for(Entry entry: this.executorMap.entrySet()) { List wasRunning = entry.getValue().threadPoolExecutor.shutdownNow(); @@ -146,6 +153,18 @@ public class ExecutorService { } } + // Submit the handler after the given delay. Used for retrying. + public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) { + ListenableFuture future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit); + future.addListener(() -> { + try { + future.get(); + } catch (Exception e) { + LOG.error("Failed to submit the event handler {} to executor", eh, e); + } + }, MoreExecutors.directExecutor()); + } + public Map getAllExecutorStatuses() { Map ret = Maps.newHashMap(); for (Map.Entry e : executorMap.entrySet()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 86e7260e7a2..185306248da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -55,7 +55,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -848,6 +846,12 @@ public class HMaster extends HRegionServer implements MasterServices { } } + // Will be overriden in test to inject customized AssignmentManager + @VisibleForTesting + protected AssignmentManager createAssignmentManager(MasterServices master) { + return new AssignmentManager(master); + } + /** * Finish initialization of HMaster after becoming the primary master. *

@@ -940,7 +944,7 @@ public class HMaster extends HRegionServer implements MasterServices { checkUnsupportedProcedure(procsByType); // Create Assignment Manager - this.assignmentManager = new AssignmentManager(this); + this.assignmentManager = createAssignmentManager(this); this.assignmentManager.start(); // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as // completed, it could still be in the procedure list. This is a bit strange but is another @@ -1347,11 +1351,6 @@ public class HMaster extends HRegionServer implements MasterServices { return (hfileCleanerFlag && logCleanerFlag); } - @Override - public TableDescriptors getTableDescriptors() { - return this.tableDescriptors; - } - @Override public ServerManager getServerManager() { return this.serverManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 76175117afd..b5a74aa1d86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2180,8 +2180,7 @@ public class HRegionServer extends HasThread implements } @Override - public void postOpenDeployTasks(final PostOpenDeployContext context) - throws KeeperException, IOException { + public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { HRegion r = context.getRegion(); long masterSystemTime = context.getMasterSystemTime(); rpcServices.checkOpen(); @@ -2195,16 +2194,16 @@ public class HRegionServer extends HasThread implements long openSeqNum = r.getOpenSeqNum(); if (openSeqNum == HConstants.NO_SEQNUM) { // If we opened a region, we should have read some sequence number from it. - LOG.error("No sequence number found when opening " + - r.getRegionInfo().getRegionNameAsString()); + LOG.error( + "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); openSeqNum = 0; } // Notify master - if (!reportRegionStateTransition(new RegionStateTransitionContext( - TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) { - throw new IOException("Failed to report opened region to master: " - + r.getRegionInfo().getRegionNameAsString()); + if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, + openSeqNum, masterSystemTime, r.getRegionInfo()))) { + throw new IOException( + "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); } triggerFlushInPrimaryRegion(r); @@ -3602,6 +3601,7 @@ public class HRegionServer extends HasThread implements /** * @return Return table descriptors implementation. */ + @Override public TableDescriptors getTableDescriptors() { return this.tableDescriptors; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e6c90219e7a..8bb2e9c0da1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.lang.reflect.InvocationTargetException; import java.net.BindException; import java.net.InetSocketAddress; @@ -118,9 +119,11 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; @@ -3682,6 +3685,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); } + private void executeOpenRegionProcedures(OpenRegionRequest request, + Map tdCache) { + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; + for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { + RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); + TableDescriptor tableDesc = tdCache.get(regionInfo.getTable()); + if (tableDesc == null) { + try { + tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); + } catch (IOException e) { + // Here we do not fail the whole method since we also need deal with other + // procedures, and we can not ignore this one, so we still schedule a + // AssignRegionHandler and it will report back to master if we still can not get the + // TableDescriptor. + LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", + regionInfo.getTable(), e); + } + } + if (regionOpenInfo.getFavoredNodesCount() > 0) { + regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), + regionOpenInfo.getFavoredNodesList()); + } + regionServer.executorService + .submit(AssignRegionHandler.create(regionServer, regionInfo, tableDesc, masterSystemTime)); + } + } + + private void executeCloseRegionProcedures(CloseRegionRequest request) { + String encodedName; + try { + encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); + } catch (DoNotRetryIOException e) { + throw new UncheckedIOException("Should not happen", e); + } + ServerName destination = + request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer()) + : null; + regionServer.executorService + .submit(UnassignRegionHandler.create(regionServer, encodedName, false, destination)); + } + + private void executeProcedures(RemoteProcedureRequest request) { + RSProcedureCallable callable; + try { + callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) + .getDeclaredConstructor().newInstance(); + } catch (Exception e) { + regionServer.remoteProcedureComplete(request.getProcId(), e); + return; + } + callable.init(request.getProcData().toByteArray(), regionServer); + regionServer.executeProcedure(request.getProcId(), callable); + } + @Override @QosPriority(priority = HConstants.ADMIN_QOS) public ExecuteProceduresResponse executeProcedures(RpcController controller, @@ -3690,28 +3747,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req : request.getOpenRegionList()) { - openRegion(controller, req); - } + // Avoid reading from the TableDescritor every time(usually it will read from the file + // system) + Map tdCache = new HashMap<>(); + request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); } if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req : request.getCloseRegionList()) { - closeRegion(controller, req); - } + request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); } if (request.getProcCount() > 0) { - for (RemoteProcedureRequest req : request.getProcList()) { - RSProcedureCallable callable; - try { - callable = Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class) - .getDeclaredConstructor().newInstance(); - } catch (Exception e) { - regionServer.remoteProcedureComplete(req.getProcId(), e); - continue; - } - callable.init(req.getProcData().toByteArray(), regionServer); - regionServer.executeProcedure(req.getProcId(), callable); - } + request.getProcList().forEach(this::executeProcedures); } regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); return ExecuteProceduresResponse.getDefaultInstance(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 7c4362c68fb..37a3606ede5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -18,14 +18,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.protobuf.Service; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.locking.EntityLock; @@ -38,12 +39,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import com.google.protobuf.Service; - /** * A curated subset of services provided by {@link HRegionServer}. * For use internally only. Passed to Managers, Services and Chores so can pass less-than-a @@ -115,14 +113,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo } /** - * Tasks to perform after region open to complete deploy of region on - * regionserver - * + * Tasks to perform after region open to complete deploy of region on regionserver * @param context the context - * @throws KeeperException - * @throws IOException */ - void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException; + void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException; class RegionStateTransitionContext { private final TransitionCode code; @@ -267,4 +261,9 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo * @return Return the object that implements the replication source executorService. */ ReplicationSourceService getReplicationSourceService(); + + /** + * @return Return table descriptors implementation. + */ + TableDescriptors getTableDescriptors(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java new file mode 100644 index 00000000000..bf3d4763e6a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.handler; + +import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext; +import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +/** + * Handles opening of a region on a region server. + *

+ * Just done the same thing with the old {@link OpenRegionHandler}, with some modifications on + * fencing and retrying. But we need to keep the {@link OpenRegionHandler} as is to keep compatible + * with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade. + */ +@InterfaceAudience.Private +public class AssignRegionHandler extends EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AssignRegionHandler.class); + + private final RegionInfo regionInfo; + + private final TableDescriptor tableDesc; + + private final long masterSystemTime; + + public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo, + @Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) { + super(server, eventType); + this.regionInfo = regionInfo; + this.tableDesc = tableDesc; + this.masterSystemTime = masterSystemTime; + } + + private RegionServerServices getServer() { + return (RegionServerServices) server; + } + + private void cleanUpAndReportFailure(IOException error) throws IOException { + LOG.warn("Failed to open region {}, will report to master", regionInfo.getRegionNameAsString(), + error); + RegionServerServices rs = getServer(); + rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE); + if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN, + HConstants.NO_SEQNUM, masterSystemTime, regionInfo))) { + throw new IOException( + "Failed to report failed open to master: " + regionInfo.getRegionNameAsString()); + } + } + + @Override + public void process() throws IOException { + RegionServerServices rs = getServer(); + String encodedName = regionInfo.getEncodedName(); + byte[] encodedNameBytes = regionInfo.getEncodedNameAsBytes(); + String regionName = regionInfo.getRegionNameAsString(); + Region onlineRegion = rs.getRegion(encodedName); + if (onlineRegion != null) { + LOG.warn("Received OPEN for the region:{}, which is already online", regionName); + // Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not? + // For normal case, it could happen that the rpc call to schedule this handler is succeeded, + // but before returning to master the connection is broken. And when master tries again, we + // have already finished the opening. For this case we do not need to call + // reportRegionStateTransition any more. + return; + } + LOG.info("Open {}", regionName); + Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); + if (previous != null) { + if (previous) { + // The region is opening and this maybe a retry on the rpc call, it is safe to ignore it. + LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" + + " - ignoring this new request for this region.", regionName); + } else { + // The region is closing. This is possible as we will update the region state to CLOSED when + // calling reportRegionStateTransition, so the HMaster will think the region is offline, + // before we actually close the region, as reportRegionStateTransition is part of the + // closing process. + LOG.info("Receiving OPEN for the region:{}, which we are trying to close, try again later", + regionName); + // TODO: backoff + rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS); + } + return; + } + HRegion region; + try { + TableDescriptor htd = + tableDesc != null ? tableDesc : rs.getTableDescriptors().get(regionInfo.getTable()); + if (htd == null) { + throw new IOException("Missing table descriptor for " + regionName); + } + // pass null for the last parameter, which used to be a CancelableProgressable, as now the + // opening can not be interrupted by a close request any more. + region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(), + rs, null); + } catch (IOException e) { + cleanUpAndReportFailure(e); + return; + } + rs.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime)); + rs.addRegion(region); + LOG.info("Opened {}", regionName); + Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes()); + if (current == null) { + // Should NEVER happen, but let's be paranoid. + LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}", + regionName); + } else if (!current) { + // Should NEVER happen, but let's be paranoid. + LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName); + } + } + + @Override + protected void handleException(Throwable t) { + LOG.warn("Fatal error occured while opening region {}, aborting...", + regionInfo.getRegionNameAsString(), t); + getServer().abort( + "Failed to open region " + regionInfo.getRegionNameAsString() + " and can not recover", t); + } + + public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo, + TableDescriptor tableDesc, long masterSystemTime) { + EventType eventType; + if (regionInfo.isMetaRegion()) { + eventType = EventType.M_RS_CLOSE_META; + } else if (regionInfo.getTable().isSystemTable() || + (tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) { + eventType = EventType.M_RS_OPEN_PRIORITY_REGION; + } else { + eventType = EventType.M_RS_OPEN_REGION; + } + return new AssignRegionHandler(server, regionInfo, tableDesc, masterSystemTime, eventType); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index f851dfacbea..0e35a0be70d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -36,6 +36,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto /** * Handles closing of a region on a region server. + *

+ * Now for regular close region request, we will use {@link UnassignRegionHandler} instead. But when + * shutting down the region server, will also close regions and the related methods still use this + * class so we keep it here. + * @see UnassignRegionHandler */ @InterfaceAudience.Private public class CloseRegionHandler extends EventHandler { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 970911fde70..31177ef79cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -41,7 +41,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto * Handles opening of a region on a region server. *

* This is executed after receiving an OPEN RPC from the master or client. + * @deprecated Keep it here only for compatible + * @see AssignRegionHandler */ +@Deprecated @InterfaceAudience.Private public class OpenRegionHandler extends EventHandler { private static final Logger LOG = LoggerFactory.getLogger(OpenRegionHandler.class); @@ -172,7 +175,7 @@ public class OpenRegionHandler extends EventHandler { * state meantime so master doesn't timeout our region-in-transition. * Caller must cleanup region if this fails. */ - boolean updateMeta(final HRegion r, long masterSystemTime) { + private boolean updateMeta(final HRegion r, long masterSystemTime) { if (this.server.isStopped() || this.rsServices.isStopping()) { return false; } @@ -275,7 +278,7 @@ public class OpenRegionHandler extends EventHandler { /** * @return Instance of HRegion if successful open else null. */ - HRegion openRegion() { + private HRegion openRegion() { HRegion region = null; try { // Instantiate the region. This also periodically tickles OPENING @@ -304,7 +307,7 @@ public class OpenRegionHandler extends EventHandler { return region; } - void cleanupFailedOpen(final HRegion region) throws IOException { + private void cleanupFailedOpen(final HRegion region) throws IOException { if (region != null) { this.rsServices.removeRegion(region, null); region.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java new file mode 100644 index 00000000000..2fb7393fc9b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.handler; + +import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +/** + * Handles closing of a region on a region server. + *

+ * Just done the same thing with the old {@link CloseRegionHandler}, with some modifications on + * fencing and retrying. But we need to keep the {@link CloseRegionHandler} as is to keep compatible + * with the zk less assignment for 1.x, otherwise it is not possible to do rolling upgrade. + */ +@InterfaceAudience.Private +public class UnassignRegionHandler extends EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(UnassignRegionHandler.class); + + private final String encodedName; + // If true, the hosting server is aborting. Region close process is different + // when we are aborting. + private final boolean abort; + + private final ServerName destination; + + public UnassignRegionHandler(RegionServerServices server, String encodedName, boolean abort, + @Nullable ServerName destination, EventType eventType) { + super(server, eventType); + this.encodedName = encodedName; + this.abort = abort; + this.destination = destination; + } + + private RegionServerServices getServer() { + return (RegionServerServices) server; + } + + @Override + public void process() throws IOException { + RegionServerServices rs = getServer(); + byte[] encodedNameBytes = Bytes.toBytes(encodedName); + Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.FALSE); + if (previous != null) { + if (previous) { + // This could happen as we will update the region state to OPEN when calling + // reportRegionStateTransition, so the HMaster will think the region is online, before we + // actually open the region, as reportRegionStateTransition is part of the opening process. + LOG.warn("Received CLOSE for the region: {}, which we are already " + + "trying to OPEN. try again later.", encodedName); + // TODO: backoff + rs.getExecutorService().delayedSubmit(this, 1, TimeUnit.SECONDS); + } else { + LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," + + " but not completed yet", encodedName); + } + return; + } + HRegion region = (HRegion) rs.getRegion(encodedName); + if (region == null) { + LOG.debug( + "Received CLOSE for a region {} which is not online, and we're not opening/closing.", + encodedName); + rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); + return; + } + String regionName = region.getRegionInfo().getEncodedName(); + LOG.info("Close {}", regionName); + if (region.getCoprocessorHost() != null) { + // XXX: The behavior is a bit broken. At master side there is no FAILED_CLOSE state, so if + // there are exception thrown from the CP, we can not report the error to master, and if here + // we just return without calling reportRegionStateTransition, the TRSP at master side will + // hang there for ever. So here if the CP throws an exception out, the only way is to abort + // the RS... + region.getCoprocessorHost().preClose(abort); + } + if (region.close(abort) == null) { + // XXX: Is this still possible? The old comment says about split, but now split is done at + // master side, so... + LOG.warn("Can't close region {}, was already closed during close()", regionName); + rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); + return; + } + rs.removeRegion(region, destination); + if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED, + HConstants.NO_SEQNUM, -1, region.getRegionInfo()))) { + throw new IOException("Failed to report close to master: " + regionName); + } + rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE); + LOG.info("Closed {}", regionName); + } + + @Override + protected void handleException(Throwable t) { + LOG.warn("Fatal error occured while closing region {}, aborting...", encodedName, t); + getServer().abort("Failed to close region " + encodedName + " and can not recover", t); + } + + public static UnassignRegionHandler create(RegionServerServices server, String encodedName, + boolean abort, @Nullable ServerName destination) { + // Just try our best to determine whether it is for closing meta. It is not the end of the world + // if we put the handler into a wrong executor. + Region region = server.getRegion(encodedName); + EventType eventType = + region != null && region.getRegionInfo().isMetaRegion() ? EventType.M_RS_CLOSE_META + : EventType.M_RS_CLOSE_REGION; + return new UnassignRegionHandler(server, encodedName, abort, destination, eventType); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index a550bf56cd1..c0a2a8cd54b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import com.google.protobuf.Service; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; @@ -27,7 +28,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -57,12 +57,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import com.google.protobuf.Service; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b @@ -125,8 +123,7 @@ public class MockRegionServerServices implements RegionServerServices { } @Override - public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException, - IOException { + public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException { addRegion(context.getRegion()); } @@ -359,4 +356,9 @@ public class MockRegionServerServices implements RegionServerServices { public ReplicationSourceService getReplicationSourceService() { return null; } + + @Override + public TableDescriptors getTableDescriptors() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 23b0c80c124..f4c2a33234f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -29,7 +29,6 @@ import java.util.Map.Entry; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Abortable; @@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -70,10 +70,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; @@ -142,7 +142,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo * store that the get pulls from. */ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, -ClientProtos.ClientService.BlockingInterface, RegionServerServices { + ClientProtos.ClientService.BlockingInterface, RegionServerServices { private final ServerName sn; private final ZKWatcher zkw; private final Configuration conf; @@ -338,8 +338,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override - public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException, - IOException { + public void postOpenDeployTasks(PostOpenDeployContext context) throws IOException { } @Override @@ -710,4 +709,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public ReplicationSourceService getReplicationSourceService() { return null; } + + @Override + public TableDescriptors getTableDescriptors() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java new file mode 100644 index 00000000000..17cf8f4c12e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestCloseAnOpeningRegion { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCloseAnOpeningRegion.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("race"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static volatile CountDownLatch ARRIVE; + + private static volatile CountDownLatch RESUME; + + public static final class MockHMaster extends HMaster { + + public MockHMaster(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + @Override + protected AssignmentManager createAssignmentManager(MasterServices master) { + return new AssignmentManager(master) { + + @Override + public ReportRegionStateTransitionResponse reportRegionStateTransition( + ReportRegionStateTransitionRequest req) throws PleaseHoldException { + ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(req); + TransitionCode code = req.getTransition(0).getTransitionCode(); + if (code == TransitionCode.OPENED && ARRIVE != null) { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return resp; + } + }; + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 60000); + UTIL.startMiniCluster( + StartMiniClusterOption.builder().numRegionServers(2).masterClass(MockHMaster.class).build()); + UTIL.createTable(TABLE_NAME, CF); + UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); + RegionInfo region = UTIL.getAdmin().getRegions(TABLE_NAME).get(0); + HRegionServer src = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HRegionServer dst = UTIL.getOtherRegionServer(src); + Thread move0 = new Thread(() -> { + try { + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(dst.getServerName().getServerName())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + move0.start(); + ARRIVE.await(); + Thread move1 = new Thread(() -> { + try { + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(src.getServerName().getServerName())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + move1.start(); + // No simple way to determine when it is safe to go on and produce the race so let's sleep for a + // well... + Thread.sleep(10000); + RESUME.countDown(); + move0.join(); + move1.join(); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + // make sure that we can write to the table, which means the region is online + table.put(new Put(Bytes.toBytes(0)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(0))); + } + } +} \ No newline at end of file