diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 3d4be0f5a18..7ff4cc21ca6 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1198,7 +1198,7 @@ possible configurations would overwhelm and obscure the important. hbase.coordinated.state.manager.class - org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager + org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager Fully qualified name of class implementing coordinated state manager. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java index 77ef2170939..caf3621f6a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.util.ReflectionUtils; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java similarity index 85% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index 7f4e510dd64..63697ee2fed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.consensus; +package org.apache.hadoop.hbase.coordination; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.CoordinatedStateException; @@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.TableStateManager; /** * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations. - * Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager + * Defines methods to retrieve coordination objects for relevant areas. CoordinatedStateManager * reference returned from Server interface has to be casted to this type to * access those methods. */ @@ -52,4 +52,8 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan @Override public abstract TableStateManager getTableStateManager() throws InterruptedException, CoordinatedStateException; + /** + * Method to retrieve coordination for split transaction + */ + abstract public SplitTransactionCoordination getSplitTransactionCoordination(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java new file mode 100644 index 00000000000..659d4e50676 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitTransaction; + +/** + * Coordination operations for split transaction. The split operation should be coordinated at the + * following stages: + * 1. start - all preparation/initialization for split transaction should be done there. + * 2. waitForSplitTransaction - the coordination should perform all logic related to split + * transaction and wait till it's finished + * 3. completeSplitTransaction - all steps that are required to complete the transaction. + * Called after PONR (point of no return) + */ +@InterfaceAudience.Private +public interface SplitTransactionCoordination { + + /** + * Dummy interface for split transaction details. + */ + public static interface SplitTransactionDetails { + } + + SplitTransactionDetails getDefaultDetails(); + + + /** + * init coordination for split transaction + * @param parent region to be created as offline + * @param serverName server event originates from + * @param hri_a daughter region + * @param hri_b daughter region + * @throws IOException + */ + void startSplitTransaction(HRegion parent, ServerName serverName, + HRegionInfo hri_a, HRegionInfo hri_b) throws IOException; + + /** + * Wait while coordination process the transaction + * @param services Used to online/offline regions. + * @param parent region + * @param hri_a daughter region + * @param hri_b daughter region + * @param std split transaction details + * @throws IOException + */ + void waitForSplitTransaction(final RegionServerServices services, + HRegion parent, HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails std) + throws IOException; + + /** + * Finish off split transaction + * @param services Used to online/offline regions. + * @param first daughter region + * @param second daughter region + * @param std split transaction details + * @param parent + * @throws IOException If thrown, transaction failed. Call + * {@link SplitTransaction#rollback(Server, RegionServerServices)} + */ + void completeSplitTransaction(RegionServerServices services, HRegion first, + HRegion second, SplitTransactionDetails std, HRegion parent) throws IOException; + + /** + * clean the split transaction + * @param hri node to delete + */ + void clean(final HRegionInfo hri); + + /** + * Required by AssignmentManager + */ + int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, + ServerName sn, SplitTransactionDetails std) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java new file mode 100644 index 00000000000..de9f51f4aae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT; +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING; +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionTransition; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitTransaction; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +public class ZKSplitTransactionCoordination implements SplitTransactionCoordination { + + private CoordinatedStateManager coordinationManager; + private final ZooKeeperWatcher watcher; + + private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class); + + public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider, + ZooKeeperWatcher watcher) { + this.coordinationManager = coordinationProvider; + this.watcher = watcher; + } + + /** + * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. Create it + * ephemeral in case regionserver dies mid-split. + *

+ * Does not transition nodes from other states. If a node already exists for this region, an + * Exception will be thrown. + * @param parent region to be created as offline + * @param serverName server event originates from + * @param hri_a daughter region + * @param hri_b daughter region + * @throws IOException + */ + + @Override + public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a, + HRegionInfo hri_b) throws IOException { + + HRegionInfo region = parent.getRegionInfo(); + try { + + LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName() + + " in PENDING_SPLIT state")); + byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b); + RegionTransition rt = + RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT, + region.getRegionName(), serverName, payload); + String node = ZKAssign.getNodeName(watcher, region.getEncodedName()); + if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) { + throw new IOException("Failed create of ephemeral " + node); + } + + } catch (KeeperException e) { + throw new IOException("Failed creating PENDING_SPLIT znode on " + + parent.getRegionNameAsString(), e); + } + + } + + /** + * Transitions an existing ephemeral node for the specified region which is currently in the begin + * state to be in the end state. Master cleans up the final SPLIT znode when it reads it (or if we + * crash, zk will clean it up). + *

+ * Does not transition nodes from other states. If for some reason the node could not be + * transitioned, the method returns -1. If the transition is successful, the version of the node + * after transition is returned. + *

+ * This method can fail and return false for three different reasons: + *

+ *

+ * Does not set any watches. + *

+ * This method should only be used by a RegionServer when splitting a region. + * @param parent region to be transitioned to opened + * @param a Daughter a of split + * @param b Daughter b of split + * @param serverName server event originates from + * @param std split transaction details + * @param beginState the expected current state the znode should be + * @param endState the state to be transition to + * @return version of node after transition, -1 if unsuccessful transition + * @throws IOException + */ + + private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b, + ServerName serverName, SplitTransactionDetails std, final EventType beginState, + final EventType endState) throws IOException { + ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std; + byte[] payload = HRegionInfo.toDelimitedByteArray(a, b); + try { + return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState, + zstd.getZnodeVersion(), payload); + } catch (KeeperException e) { + throw new IOException( + "Failed transition of splitting node " + parent.getRegionNameAsString(), e); + } + } + + /** + * Wait for the splitting node to be transitioned from pending_split to splitting by master. + * That's how we are sure master has processed the event and is good with us to move on. If we + * don't get any update, we periodically transition the node so that master gets the callback. If + * the node is removed or is not in pending_split state any more, we abort the split. + */ + @Override + public void waitForSplitTransaction(final RegionServerServices services, HRegion parent, + HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException { + ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd; + + // After creating the split node, wait for master to transition it + // from PENDING_SPLIT to SPLITTING so that we can move on. We want master + // knows about it and won't transition any region which is splitting. + try { + int spins = 0; + Stat stat = new Stat(); + ServerName expectedServer = coordinationManager.getServer().getServerName(); + String node = parent.getRegionInfo().getEncodedName(); + while (!(coordinationManager.getServer().isStopped() || services.isStopping())) { + if (spins % 5 == 0) { + LOG.debug("Still waiting for master to process " + "the pending_split for " + node); + SplitTransactionDetails temp = getDefaultDetails(); + transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp, + RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT); + } + Thread.sleep(100); + spins++; + byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat); + if (data == null) { + throw new IOException("Data is null, splitting node " + node + " no longer exists"); + } + RegionTransition rt = RegionTransition.parseFrom(data); + EventType et = rt.getEventType(); + if (et == RS_ZK_REGION_SPLITTING) { + ServerName serverName = rt.getServerName(); + if (!serverName.equals(expectedServer)) { + throw new IOException("Splitting node " + node + " is for " + serverName + ", not us " + + expectedServer); + } + byte[] payloadOfSplitting = rt.getPayload(); + List splittingRegions = + HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length); + assert splittingRegions.size() == 2; + HRegionInfo a = splittingRegions.get(0); + HRegionInfo b = splittingRegions.get(1); + if (!(hri_a.equals(a) && hri_b.equals(b))) { + throw new IOException("Splitting node " + node + " is for " + a + ", " + b + + ", not expected daughters: " + hri_a + ", " + hri_b); + } + // Master has processed it. + zstd.setZnodeVersion(stat.getVersion()); + return; + } + if (et != RS_ZK_REQUEST_REGION_SPLIT) { + throw new IOException("Splitting node " + node + " moved out of splitting to " + et); + } + } + // Server is stopping/stopped + throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped")); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Failed getting SPLITTING znode on " + parent.getRegionNameAsString(), + e); + } + } + + /** + * Finish off split transaction, transition the zknode + * @param services Used to online/offline regions. + * @param a daughter region + * @param b daughter region + * @param std split transaction details + * @param parent + * @throws IOException If thrown, transaction failed. Call + * {@link SplitTransaction#rollback(Server, RegionServerServices)} + */ + @Override + public void completeSplitTransaction(final RegionServerServices services, HRegion a, HRegion b, + SplitTransactionDetails std, HRegion parent) throws IOException { + ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std; + // Tell master about split by updating zk. If we fail, abort. + if (coordinationManager.getServer() != null) { + try { + zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(), + b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd, + RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT)); + + int spins = 0; + // Now wait for the master to process the split. We know it's done + // when the znode is deleted. The reason we keep tickling the znode is + // that it's possible for the master to miss an event. + do { + if (spins % 10 == 0) { + LOG.debug("Still waiting on the master to process the split for " + + parent.getRegionInfo().getEncodedName()); + } + Thread.sleep(100); + // When this returns -1 it means the znode doesn't exist + zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(), + b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd, + RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT)); + spins++; + } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped() + && !services.isStopping()); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Failed telling master about split", e); + } + } + + // Leaving here, the splitdir with its dross will be in place but since the + // split was successful, just leave it; it'll be cleaned when parent is + // deleted and cleaned up. + } + + @Override + public void clean(final HRegionInfo hri) { + try { + // Only delete if its in expected state; could have been hijacked. + if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), + hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer() + .getServerName())) { + ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(), + RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName()); + } + } catch (KeeperException.NoNodeException e) { + LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); + } catch (KeeperException e) { + coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e); + } + } + + /** + * ZK-based implementation. Has details about whether the state transition should be reflected in + * ZK, as well as expected version of znode. + */ + public static class ZkSplitTransactionDetails implements + SplitTransactionCoordination.SplitTransactionDetails { + private int znodeVersion; + + public ZkSplitTransactionDetails() { + } + + /** + * @return znode current version + */ + public int getZnodeVersion() { + return znodeVersion; + } + + /** + * @param znodeVersion znode new version + */ + public void setZnodeVersion(int znodeVersion) { + this.znodeVersion = znodeVersion; + } + } + + @Override + public SplitTransactionDetails getDefaultDetails() { + ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails(); + zstd.setZnodeVersion(-1); + return zstd; + } + + @Override + public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn, + SplitTransactionDetails std) throws IOException { + return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT, + RS_ZK_REGION_SPLITTING); + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java similarity index 83% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 27e09ca9ef8..2ef2db97f5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.consensus; +package org.apache.hadoop.hbase.coordination; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,13 +33,16 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class); - private Server server; - private ZooKeeperWatcher watcher; + protected Server server; + protected ZooKeeperWatcher watcher; + protected SplitTransactionCoordination splitTransactionCoordination; @Override public void initialize(Server server) { this.server = server; this.watcher = server.getZooKeeper(); + + splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); } @Override @@ -56,4 +59,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { throw new CoordinatedStateException(e); } } + + @Override + public SplitTransactionCoordination getSplitTransactionCoordination() { + return splitTransactionCoordination; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 4dcd3e9abbb..1c4804d259b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; @@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.regionserver.SplitTransaction; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; @@ -3225,25 +3226,26 @@ public class AssignmentManager extends ZooKeeperListener { EventType et = rt.getEventType(); if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) { try { - if (SplitTransaction.transitionSplittingNode(watcher, p, - hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT, - EventType.RS_ZK_REGION_SPLITTING) == -1) { + SplitTransactionDetails std = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().getDefaultDetails(); + if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) { byte[] data = ZKAssign.getData(watcher, encodedName); EventType currentType = null; if (data != null) { RegionTransition newRt = RegionTransition.parseFrom(data); currentType = newRt.getEventType(); } - if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT - && currentType != EventType.RS_ZK_REGION_SPLITTING)) { - LOG.warn("Failed to transition pending_split node " - + encodedName + " to splitting, it's now " + currentType); + if (currentType == null + || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) { + LOG.warn("Failed to transition pending_split node " + encodedName + + " to splitting, it's now " + currentType); return false; } } } catch (Exception e) { - LOG.warn("Failed to transition pending_split node " - + encodedName + " to splitting", e); + LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e); return false; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 3394ccd6d7d..db4dad997bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -41,25 +37,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.Stat; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -97,7 +88,7 @@ public class SplitTransaction { private HRegionInfo hri_a; private HRegionInfo hri_b; private long fileSplitTimeout = 30000; - private int znodeVersion = -1; + public SplitTransactionCoordination.SplitTransactionDetails std; /* * Row to split around @@ -113,7 +104,7 @@ public class SplitTransaction { /** * Set region as in transition, set it into SPLITTING state. */ - SET_SPLITTING_IN_ZK, + SET_SPLITTING, /** * We created the temporary split data directory. */ @@ -294,26 +285,24 @@ public class SplitTransaction { } return daughterRegions; } - public PairOfSameType stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { - // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't - // have zookeeper so don't do zk stuff if server or zookeeper is null - if (server != null && server.getZooKeeper() != null) { - try { - createNodeSplitting(server.getZooKeeper(), - parent.getRegionInfo(), server.getServerName(), hri_a, hri_b); - } catch (KeeperException e) { - throw new IOException("Failed creating PENDING_SPLIT znode on " + - this.parent.getRegionNameAsString(), e); + + if (server != null && server.getCoordinatedStateManager() != null) { + if (std == null) { + std = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().getDefaultDetails(); } + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(), + hri_a, hri_b); } - this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); - if (server != null && server.getZooKeeper() != null) { - // After creating the split node, wait for master to transition it - // from PENDING_SPLIT to SPLITTING so that we can move on. We want master - // knows about it and won't transition any region which is splitting. - znodeVersion = getZKNode(server, services); + this.journal.add(JournalEntry.SET_SPLITTING); + if (server != null && server.getCoordinatedStateManager() != null) { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a, + hri_b, std); } this.parent.getRegionFileSystem().createSplitsDir(); @@ -369,8 +358,7 @@ public class SplitTransaction { /** * Perform time consuming opening of the daughter regions. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) + * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @param a first daughter region * @param a second daughter region @@ -424,137 +412,9 @@ public class SplitTransaction { } } - /** - * Finish off split transaction, transition the zknode - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @param a first daughter region - * @param a second daughter region - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - */ - /* package */void transitionZKNode(final Server server, - final RegionServerServices services, HRegion a, HRegion b) - throws IOException { - // Tell master about split by updating zk. If we fail, abort. - if (server != null && server.getZooKeeper() != null) { - try { - this.znodeVersion = transitionSplittingNode(server.getZooKeeper(), - parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), - server.getServerName(), this.znodeVersion, - RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT); - - int spins = 0; - // Now wait for the master to process the split. We know it's done - // when the znode is deleted. The reason we keep tickling the znode is - // that it's possible for the master to miss an event. - do { - if (spins % 10 == 0) { - LOG.debug("Still waiting on the master to process the split for " + - this.parent.getRegionInfo().getEncodedName()); - } - Thread.sleep(100); - // When this returns -1 it means the znode doesn't exist - this.znodeVersion = transitionSplittingNode(server.getZooKeeper(), - parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), - server.getServerName(), this.znodeVersion, - RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT); - spins++; - } while (this.znodeVersion != -1 && !server.isStopped() - && !services.isStopping()); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed telling master about split", e); - } - } - - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().postSplit(a,b); - } - - // Leaving here, the splitdir with its dross will be in place but since the - // split was successful, just leave it; it'll be cleaned when parent is - // deleted and cleaned up. - } - - /** - * Wait for the splitting node to be transitioned from pending_split - * to splitting by master. That's how we are sure master has processed - * the event and is good with us to move on. If we don't get any update, - * we periodically transition the node so that master gets the callback. - * If the node is removed or is not in pending_split state any more, - * we abort the split. - */ - private int getZKNode(final Server server, - final RegionServerServices services) throws IOException { - // Wait for the master to process the pending_split. - try { - int spins = 0; - Stat stat = new Stat(); - ZooKeeperWatcher zkw = server.getZooKeeper(); - ServerName expectedServer = server.getServerName(); - String node = parent.getRegionInfo().getEncodedName(); - while (!(server.isStopped() || services.isStopping())) { - if (spins % 5 == 0) { - LOG.debug("Still waiting for master to process " - + "the pending_split for " + node); - transitionSplittingNode(zkw, parent.getRegionInfo(), - hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT, - RS_ZK_REQUEST_REGION_SPLIT); - } - Thread.sleep(100); - spins++; - byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat); - if (data == null) { - throw new IOException("Data is null, splitting node " - + node + " no longer exists"); - } - RegionTransition rt = RegionTransition.parseFrom(data); - EventType et = rt.getEventType(); - if (et == RS_ZK_REGION_SPLITTING) { - ServerName serverName = rt.getServerName(); - if (!serverName.equals(expectedServer)) { - throw new IOException("Splitting node " + node + " is for " - + serverName + ", not us " + expectedServer); - } - byte [] payloadOfSplitting = rt.getPayload(); - List splittingRegions = HRegionInfo.parseDelimitedFrom( - payloadOfSplitting, 0, payloadOfSplitting.length); - assert splittingRegions.size() == 2; - HRegionInfo a = splittingRegions.get(0); - HRegionInfo b = splittingRegions.get(1); - if (!(hri_a.equals(a) && hri_b.equals(b))) { - throw new IOException("Splitting node " + node + " is for " + a + ", " - + b + ", not expected daughters: " + hri_a + ", " + hri_b); - } - // Master has processed it. - return stat.getVersion(); - } - if (et != RS_ZK_REQUEST_REGION_SPLIT) { - throw new IOException("Splitting node " + node - + " moved out of splitting to " + et); - } - } - // Server is stopping/stopped - throw new IOException("Server is " - + (services.isStopping() ? "stopping" : "stopped")); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed getting SPLITTING znode on " - + parent.getRegionNameAsString(), e); - } - } - /** * Run the transaction. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) + * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @throws IOException If thrown, transaction failed. * Call {@link #rollback(Server, RegionServerServices)} @@ -565,6 +425,11 @@ public class SplitTransaction { public PairOfSameType execute(final Server server, final RegionServerServices services) throws IOException { + if (server != null && server.getCoordinatedStateManager() != null) { + std = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().getDefaultDetails(); + } PairOfSameType regions = createDaughters(server, services); if (this.parent.getCoprocessorHost() != null) { this.parent.getCoprocessorHost().preSplitAfterPONR(); @@ -576,7 +441,17 @@ public class SplitTransaction { final RegionServerServices services, PairOfSameType regions) throws IOException { openDaughters(server, services, regions.getFirst(), regions.getSecond()); - transitionZKNode(server, services, regions.getFirst(), regions.getSecond()); + if (server != null && server.getCoordinatedStateManager() != null) { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(), + regions.getSecond(), std, parent); + } + // Coprocessor callback + if (parent.getCoprocessorHost() != null) { + parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); + } + + return regions; } @@ -800,9 +675,10 @@ public class SplitTransaction { JournalEntry je = iterator.previous(); switch(je) { - case SET_SPLITTING_IN_ZK: - if (server != null && server.getZooKeeper() != null) { - cleanZK(server, this.parent.getRegionInfo()); + case SET_SPLITTING: + if (server != null && server instanceof HRegionServer) { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().clean(this.parent.getRegionInfo()); } break; @@ -864,88 +740,4 @@ public class SplitTransaction { return hri_b; } - private static void cleanZK(final Server server, final HRegionInfo hri) { - try { - // Only delete if its in expected state; could have been hijacked. - if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) { - ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REGION_SPLITTING, server.getServerName()); - } - } catch (KeeperException.NoNodeException e) { - LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); - } catch (KeeperException e) { - server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); - } - } - - /** - * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. - * Create it ephemeral in case regionserver dies mid-split. - * - *

Does not transition nodes from other states. If a node already exists - * for this region, a {@link NodeExistsException} will be thrown. - * - * @param zkw zk reference - * @param region region to be created as offline - * @param serverName server event originates from - * @throws KeeperException - * @throws IOException - */ - public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region, - final ServerName serverName, final HRegionInfo a, - final HRegionInfo b) throws KeeperException, IOException { - LOG.debug(zkw.prefix("Creating ephemeral node for " + - region.getEncodedName() + " in PENDING_SPLIT state")); - byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); - RegionTransition rt = RegionTransition.createRegionTransition( - RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload); - String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); - if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { - throw new IOException("Failed create of ephemeral " + node); - } - } - - /** - * Transitions an existing ephemeral node for the specified region which is - * currently in the begin state to be in the end state. Master cleans up the - * final SPLIT znode when it reads it (or if we crash, zk will clean it up). - * - *

Does not transition nodes from other states. If for some reason the - * node could not be transitioned, the method returns -1. If the transition - * is successful, the version of the node after transition is returned. - * - *

This method can fail and return false for three different reasons: - *

- * - *

Does not set any watches. - * - *

This method should only be used by a RegionServer when splitting a region. - * - * @param zkw zk reference - * @param parent region to be transitioned to opened - * @param a Daughter a of split - * @param b Daughter b of split - * @param serverName server event originates from - * @param znodeVersion expected version of data before modification - * @param beginState the expected current state the znode should be - * @param endState the state to be transition to - * @return version of node after transition, -1 if unsuccessful transition - * @throws KeeperException if unexpected zookeeper exception - * @throws IOException - */ - public static int transitionSplittingNode(ZooKeeperWatcher zkw, - HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName, - final int znodeVersion, final EventType beginState, - final EventType endState) throws KeeperException, IOException { - byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); - return ZKAssign.transitionNode(zkw, parent, serverName, - beginState, endState, znodeVersion, payload); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java index b3bf1f0db69..777bdb14cdb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java @@ -22,7 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.catalog.CatalogTracker; -import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 5cf10f35c9a..c7d3f1f7b65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 82f357f5154..14a44fa32c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -139,8 +140,9 @@ public class TestEndToEndSplitTransaction { assertTrue(test(con, tableName, lastRow, server)); // 4. phase III - split.transitionZKNode(server, server, regions.getFirst(), - regions.getSecond()); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(), + regions.getSecond(), split.std, region); assertTrue(test(con, tableName, firstRow, server)); assertTrue(test(con, tableName, lastRow, server)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 71bc6c6cfb3..44f56c117bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -190,6 +192,10 @@ public class TestSplitTransactionOnCluster { // find a splittable region final HRegion region = findSplittableRegion(regions); assertTrue("not able to find a splittable region", region != null); + MockedCoordinatedStateManager cp = new MockedCoordinatedStateManager(); + cp.initialize(regionServer, region); + cp.start(); + regionServer.csm = cp; new Thread() { @Override @@ -1083,18 +1089,52 @@ public class TestSplitTransactionOnCluster { TESTING_UTIL.deleteTable(tableName); } } + public static class MockedCoordinatedStateManager extends ZkCoordinatedStateManager { - public static class MockedSplitTransaction extends SplitTransaction { + public void initialize(Server server, HRegion region) { + this.server = server; + this.watcher = server.getZooKeeper(); + splitTransactionCoordination = new MockedSplitTransactionCoordination(this, watcher, region); + + } + } + + public static class MockedSplitTransaction extends SplitTransaction { + + private HRegion currentRegion; + public MockedSplitTransaction(HRegion region, byte[] splitrow) { + super(region, splitrow); + this.currentRegion = region; + } + @Override + public boolean rollback(Server server, RegionServerServices services) throws IOException { + if (this.currentRegion.getRegionInfo().getTable().getNameAsString() + .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { + if(secondSplit){ + super.rollback(server, services); + latch.countDown(); + return true; + } + } + return super.rollback(server, services); + } + + + } + + public static class MockedSplitTransactionCoordination extends ZKSplitTransactionCoordination { private HRegion currentRegion; - public MockedSplitTransaction(HRegion r, byte[] splitrow) { - super(r, splitrow); - this.currentRegion = r; + + public MockedSplitTransactionCoordination(CoordinatedStateManager coordinationProvider, + ZooKeeperWatcher watcher, HRegion region) { + super(coordinationProvider, watcher); + currentRegion = region; } @Override - void transitionZKNode(Server server, RegionServerServices services, HRegion a, HRegion b) - throws IOException { + public void completeSplitTransaction(RegionServerServices services, HRegion a, HRegion b, + SplitTransactionDetails std, HRegion parent) throws IOException { if (this.currentRegion.getRegionInfo().getTable().getNameAsString() .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { try { @@ -1106,25 +1146,12 @@ public class TestSplitTransactionOnCluster { } } - super.transitionZKNode(server, services, a, b); + super.completeSplitTransaction(services, a, b, std, parent); if (this.currentRegion.getRegionInfo().getTable().getNameAsString() .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { firstSplitCompleted = true; } } - @Override - public boolean rollback(Server server, RegionServerServices services) throws IOException { - if (this.currentRegion.getRegionInfo().getTable().getNameAsString() - .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { - if(secondSplit){ - super.rollback(server, services); - latch.countDown(); - return true; - } - } - return super.rollback(server, services); - } - } private HRegion findSplittableRegion(final List regions) throws InterruptedException {