HBASE-10985 Decouple Split Transaction from Zookeeper (Sergey Soldatov)
This commit is contained in:
parent
6fdf737171
commit
43be19794a
@ -1198,7 +1198,7 @@ possible configurations would overwhelm and obscure the important.
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.coordinated.state.manager.class</name>
|
||||
<value>org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager</value>
|
||||
<value>org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager</value>
|
||||
<description>Fully qualified name of class implementing coordinated state manager.</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
|
@ -15,7 +15,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.consensus;
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.TableStateManager;
|
||||
|
||||
/**
|
||||
* Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
|
||||
* Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager
|
||||
* Defines methods to retrieve coordination objects for relevant areas. CoordinatedStateManager
|
||||
* reference returned from Server interface has to be casted to this type to
|
||||
* access those methods.
|
||||
*/
|
||||
@ -52,4 +52,8 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
|
||||
@Override
|
||||
public abstract TableStateManager getTableStateManager() throws InterruptedException,
|
||||
CoordinatedStateException;
|
||||
/**
|
||||
* Method to retrieve coordination for split transaction
|
||||
*/
|
||||
abstract public SplitTransactionCoordination getSplitTransactionCoordination();
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
|
||||
/**
|
||||
* Coordination operations for split transaction. The split operation should be coordinated at the
|
||||
* following stages:
|
||||
* 1. start - all preparation/initialization for split transaction should be done there.
|
||||
* 2. waitForSplitTransaction - the coordination should perform all logic related to split
|
||||
* transaction and wait till it's finished
|
||||
* 3. completeSplitTransaction - all steps that are required to complete the transaction.
|
||||
* Called after PONR (point of no return)
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface SplitTransactionCoordination {
|
||||
|
||||
/**
|
||||
* Dummy interface for split transaction details.
|
||||
*/
|
||||
public static interface SplitTransactionDetails {
|
||||
}
|
||||
|
||||
SplitTransactionDetails getDefaultDetails();
|
||||
|
||||
|
||||
/**
|
||||
* init coordination for split transaction
|
||||
* @param parent region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @param hri_a daughter region
|
||||
* @param hri_b daughter region
|
||||
* @throws IOException
|
||||
*/
|
||||
void startSplitTransaction(HRegion parent, ServerName serverName,
|
||||
HRegionInfo hri_a, HRegionInfo hri_b) throws IOException;
|
||||
|
||||
/**
|
||||
* Wait while coordination process the transaction
|
||||
* @param services Used to online/offline regions.
|
||||
* @param parent region
|
||||
* @param hri_a daughter region
|
||||
* @param hri_b daughter region
|
||||
* @param std split transaction details
|
||||
* @throws IOException
|
||||
*/
|
||||
void waitForSplitTransaction(final RegionServerServices services,
|
||||
HRegion parent, HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails std)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Finish off split transaction
|
||||
* @param services Used to online/offline regions.
|
||||
* @param first daughter region
|
||||
* @param second daughter region
|
||||
* @param std split transaction details
|
||||
* @param parent
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link SplitTransaction#rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
void completeSplitTransaction(RegionServerServices services, HRegion first,
|
||||
HRegion second, SplitTransactionDetails std, HRegion parent) throws IOException;
|
||||
|
||||
/**
|
||||
* clean the split transaction
|
||||
* @param hri node to delete
|
||||
*/
|
||||
void clean(final HRegionInfo hri);
|
||||
|
||||
/**
|
||||
* Required by AssignmentManager
|
||||
*/
|
||||
int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
|
||||
ServerName sn, SplitTransactionDetails std) throws IOException;
|
||||
}
|
@ -0,0 +1,314 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
|
||||
|
||||
private CoordinatedStateManager coordinationManager;
|
||||
private final ZooKeeperWatcher watcher;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
|
||||
|
||||
public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
|
||||
ZooKeeperWatcher watcher) {
|
||||
this.coordinationManager = coordinationProvider;
|
||||
this.watcher = watcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. Create it
|
||||
* ephemeral in case regionserver dies mid-split.
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If a node already exists for this region, an
|
||||
* Exception will be thrown.
|
||||
* @param parent region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @param hri_a daughter region
|
||||
* @param hri_b daughter region
|
||||
* @throws IOException
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
|
||||
HRegionInfo hri_b) throws IOException {
|
||||
|
||||
HRegionInfo region = parent.getRegionInfo();
|
||||
try {
|
||||
|
||||
LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
|
||||
+ " in PENDING_SPLIT state"));
|
||||
byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
|
||||
RegionTransition rt =
|
||||
RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
|
||||
region.getRegionName(), serverName, payload);
|
||||
String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
|
||||
if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
|
||||
throw new IOException("Failed create of ephemeral " + node);
|
||||
}
|
||||
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed creating PENDING_SPLIT znode on "
|
||||
+ parent.getRegionNameAsString(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions an existing ephemeral node for the specified region which is currently in the begin
|
||||
* state to be in the end state. Master cleans up the final SPLIT znode when it reads it (or if we
|
||||
* crash, zk will clean it up).
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If for some reason the node could not be
|
||||
* transitioned, the method returns -1. If the transition is successful, the version of the node
|
||||
* after transition is returned.
|
||||
* <p>
|
||||
* This method can fail and return false for three different reasons:
|
||||
* <ul>
|
||||
* <li>Node for this region does not exist</li>
|
||||
* <li>Node for this region is not in the begin state</li>
|
||||
* <li>After verifying the begin state, update fails because of wrong version (this should never
|
||||
* actually happen since an RS only does this transition following a transition to the begin
|
||||
* state. If two RS are conflicting, one would fail the original transition to the begin state and
|
||||
* not this transition)</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Does not set any watches.
|
||||
* <p>
|
||||
* This method should only be used by a RegionServer when splitting a region.
|
||||
* @param parent region to be transitioned to opened
|
||||
* @param a Daughter a of split
|
||||
* @param b Daughter b of split
|
||||
* @param serverName server event originates from
|
||||
* @param std split transaction details
|
||||
* @param beginState the expected current state the znode should be
|
||||
* @param endState the state to be transition to
|
||||
* @return version of node after transition, -1 if unsuccessful transition
|
||||
* @throws IOException
|
||||
*/
|
||||
|
||||
private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
|
||||
ServerName serverName, SplitTransactionDetails std, final EventType beginState,
|
||||
final EventType endState) throws IOException {
|
||||
ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
|
||||
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
|
||||
try {
|
||||
return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
|
||||
zstd.getZnodeVersion(), payload);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(
|
||||
"Failed transition of splitting node " + parent.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the splitting node to be transitioned from pending_split to splitting by master.
|
||||
* That's how we are sure master has processed the event and is good with us to move on. If we
|
||||
* don't get any update, we periodically transition the node so that master gets the callback. If
|
||||
* the node is removed or is not in pending_split state any more, we abort the split.
|
||||
*/
|
||||
@Override
|
||||
public void waitForSplitTransaction(final RegionServerServices services, HRegion parent,
|
||||
HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
|
||||
ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
|
||||
|
||||
// After creating the split node, wait for master to transition it
|
||||
// from PENDING_SPLIT to SPLITTING so that we can move on. We want master
|
||||
// knows about it and won't transition any region which is splitting.
|
||||
try {
|
||||
int spins = 0;
|
||||
Stat stat = new Stat();
|
||||
ServerName expectedServer = coordinationManager.getServer().getServerName();
|
||||
String node = parent.getRegionInfo().getEncodedName();
|
||||
while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
|
||||
if (spins % 5 == 0) {
|
||||
LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
|
||||
SplitTransactionDetails temp = getDefaultDetails();
|
||||
transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
|
||||
RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
spins++;
|
||||
byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
|
||||
if (data == null) {
|
||||
throw new IOException("Data is null, splitting node " + node + " no longer exists");
|
||||
}
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
EventType et = rt.getEventType();
|
||||
if (et == RS_ZK_REGION_SPLITTING) {
|
||||
ServerName serverName = rt.getServerName();
|
||||
if (!serverName.equals(expectedServer)) {
|
||||
throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
|
||||
+ expectedServer);
|
||||
}
|
||||
byte[] payloadOfSplitting = rt.getPayload();
|
||||
List<HRegionInfo> splittingRegions =
|
||||
HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
|
||||
assert splittingRegions.size() == 2;
|
||||
HRegionInfo a = splittingRegions.get(0);
|
||||
HRegionInfo b = splittingRegions.get(1);
|
||||
if (!(hri_a.equals(a) && hri_b.equals(b))) {
|
||||
throw new IOException("Splitting node " + node + " is for " + a + ", " + b
|
||||
+ ", not expected daughters: " + hri_a + ", " + hri_b);
|
||||
}
|
||||
// Master has processed it.
|
||||
zstd.setZnodeVersion(stat.getVersion());
|
||||
return;
|
||||
}
|
||||
if (et != RS_ZK_REQUEST_REGION_SPLIT) {
|
||||
throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
|
||||
}
|
||||
}
|
||||
// Server is stopping/stopped
|
||||
throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed getting SPLITTING znode on " + parent.getRegionNameAsString(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish off split transaction, transition the zknode
|
||||
* @param services Used to online/offline regions.
|
||||
* @param a daughter region
|
||||
* @param b daughter region
|
||||
* @param std split transaction details
|
||||
* @param parent
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link SplitTransaction#rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
@Override
|
||||
public void completeSplitTransaction(final RegionServerServices services, HRegion a, HRegion b,
|
||||
SplitTransactionDetails std, HRegion parent) throws IOException {
|
||||
ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
|
||||
// Tell master about split by updating zk. If we fail, abort.
|
||||
if (coordinationManager.getServer() != null) {
|
||||
try {
|
||||
zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
|
||||
b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
|
||||
RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
|
||||
|
||||
int spins = 0;
|
||||
// Now wait for the master to process the split. We know it's done
|
||||
// when the znode is deleted. The reason we keep tickling the znode is
|
||||
// that it's possible for the master to miss an event.
|
||||
do {
|
||||
if (spins % 10 == 0) {
|
||||
LOG.debug("Still waiting on the master to process the split for "
|
||||
+ parent.getRegionInfo().getEncodedName());
|
||||
}
|
||||
Thread.sleep(100);
|
||||
// When this returns -1 it means the znode doesn't exist
|
||||
zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
|
||||
b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
|
||||
RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
|
||||
spins++;
|
||||
} while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
|
||||
&& !services.isStopping());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed telling master about split", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Leaving here, the splitdir with its dross will be in place but since the
|
||||
// split was successful, just leave it; it'll be cleaned when parent is
|
||||
// deleted and cleaned up.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clean(final HRegionInfo hri) {
|
||||
try {
|
||||
// Only delete if its in expected state; could have been hijacked.
|
||||
if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
|
||||
hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
|
||||
.getServerName())) {
|
||||
ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
|
||||
RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
|
||||
} catch (KeeperException e) {
|
||||
coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ZK-based implementation. Has details about whether the state transition should be reflected in
|
||||
* ZK, as well as expected version of znode.
|
||||
*/
|
||||
public static class ZkSplitTransactionDetails implements
|
||||
SplitTransactionCoordination.SplitTransactionDetails {
|
||||
private int znodeVersion;
|
||||
|
||||
public ZkSplitTransactionDetails() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return znode current version
|
||||
*/
|
||||
public int getZnodeVersion() {
|
||||
return znodeVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param znodeVersion znode new version
|
||||
*/
|
||||
public void setZnodeVersion(int znodeVersion) {
|
||||
this.znodeVersion = znodeVersion;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SplitTransactionDetails getDefaultDetails() {
|
||||
ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
|
||||
zstd.setZnodeVersion(-1);
|
||||
return zstd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
|
||||
SplitTransactionDetails std) throws IOException {
|
||||
return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
|
||||
RS_ZK_REGION_SPLITTING);
|
||||
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.consensus;
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -33,13 +33,16 @@ import org.apache.zookeeper.KeeperException;
|
||||
@InterfaceAudience.Private
|
||||
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
||||
private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class);
|
||||
private Server server;
|
||||
private ZooKeeperWatcher watcher;
|
||||
protected Server server;
|
||||
protected ZooKeeperWatcher watcher;
|
||||
protected SplitTransactionCoordination splitTransactionCoordination;
|
||||
|
||||
@Override
|
||||
public void initialize(Server server) {
|
||||
this.server = server;
|
||||
this.watcher = server.getZooKeeper();
|
||||
|
||||
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -56,4 +59,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SplitTransactionCoordination getSplitTransactionCoordination() {
|
||||
return splitTransactionCoordination;
|
||||
}
|
||||
}
|
@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
@ -3225,25 +3226,26 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||
EventType et = rt.getEventType();
|
||||
if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
|
||||
try {
|
||||
if (SplitTransaction.transitionSplittingNode(watcher, p,
|
||||
hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
|
||||
EventType.RS_ZK_REGION_SPLITTING) == -1) {
|
||||
SplitTransactionDetails std =
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().getDefaultDetails();
|
||||
if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
|
||||
byte[] data = ZKAssign.getData(watcher, encodedName);
|
||||
EventType currentType = null;
|
||||
if (data != null) {
|
||||
RegionTransition newRt = RegionTransition.parseFrom(data);
|
||||
currentType = newRt.getEventType();
|
||||
}
|
||||
if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
|
||||
&& currentType != EventType.RS_ZK_REGION_SPLITTING)) {
|
||||
LOG.warn("Failed to transition pending_split node "
|
||||
+ encodedName + " to splitting, it's now " + currentType);
|
||||
if (currentType == null
|
||||
|| (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
|
||||
LOG.warn("Failed to transition pending_split node " + encodedName
|
||||
+ " to splitting, it's now " + currentType);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to transition pending_split node "
|
||||
+ encodedName + " to splitting", e);
|
||||
LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
@ -41,25 +37,20 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
@ -97,7 +88,7 @@ public class SplitTransaction {
|
||||
private HRegionInfo hri_a;
|
||||
private HRegionInfo hri_b;
|
||||
private long fileSplitTimeout = 30000;
|
||||
private int znodeVersion = -1;
|
||||
public SplitTransactionCoordination.SplitTransactionDetails std;
|
||||
|
||||
/*
|
||||
* Row to split around
|
||||
@ -113,7 +104,7 @@ public class SplitTransaction {
|
||||
/**
|
||||
* Set region as in transition, set it into SPLITTING state.
|
||||
*/
|
||||
SET_SPLITTING_IN_ZK,
|
||||
SET_SPLITTING,
|
||||
/**
|
||||
* We created the temporary split data directory.
|
||||
*/
|
||||
@ -294,26 +285,24 @@ public class SplitTransaction {
|
||||
}
|
||||
return daughterRegions;
|
||||
}
|
||||
|
||||
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
|
||||
final RegionServerServices services, boolean testing) throws IOException {
|
||||
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
|
||||
// have zookeeper so don't do zk stuff if server or zookeeper is null
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
try {
|
||||
createNodeSplitting(server.getZooKeeper(),
|
||||
parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed creating PENDING_SPLIT znode on " +
|
||||
this.parent.getRegionNameAsString(), e);
|
||||
|
||||
if (server != null && server.getCoordinatedStateManager() != null) {
|
||||
if (std == null) {
|
||||
std =
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().getDefaultDetails();
|
||||
}
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
|
||||
hri_a, hri_b);
|
||||
}
|
||||
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
// After creating the split node, wait for master to transition it
|
||||
// from PENDING_SPLIT to SPLITTING so that we can move on. We want master
|
||||
// knows about it and won't transition any region which is splitting.
|
||||
znodeVersion = getZKNode(server, services);
|
||||
this.journal.add(JournalEntry.SET_SPLITTING);
|
||||
if (server != null && server.getCoordinatedStateManager() != null) {
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
|
||||
hri_b, std);
|
||||
}
|
||||
|
||||
this.parent.getRegionFileSystem().createSplitsDir();
|
||||
@ -369,8 +358,7 @@ public class SplitTransaction {
|
||||
|
||||
/**
|
||||
* Perform time consuming opening of the daughter regions.
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @param a first daughter region
|
||||
* @param a second daughter region
|
||||
@ -424,137 +412,9 @@ public class SplitTransaction {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish off split transaction, transition the zknode
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param services Used to online/offline regions.
|
||||
* @param a first daughter region
|
||||
* @param a second daughter region
|
||||
* @throws IOException If thrown, transaction failed.
|
||||
* Call {@link #rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
/* package */void transitionZKNode(final Server server,
|
||||
final RegionServerServices services, HRegion a, HRegion b)
|
||||
throws IOException {
|
||||
// Tell master about split by updating zk. If we fail, abort.
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
try {
|
||||
this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
|
||||
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
|
||||
server.getServerName(), this.znodeVersion,
|
||||
RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
|
||||
|
||||
int spins = 0;
|
||||
// Now wait for the master to process the split. We know it's done
|
||||
// when the znode is deleted. The reason we keep tickling the znode is
|
||||
// that it's possible for the master to miss an event.
|
||||
do {
|
||||
if (spins % 10 == 0) {
|
||||
LOG.debug("Still waiting on the master to process the split for " +
|
||||
this.parent.getRegionInfo().getEncodedName());
|
||||
}
|
||||
Thread.sleep(100);
|
||||
// When this returns -1 it means the znode doesn't exist
|
||||
this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
|
||||
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
|
||||
server.getServerName(), this.znodeVersion,
|
||||
RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
|
||||
spins++;
|
||||
} while (this.znodeVersion != -1 && !server.isStopped()
|
||||
&& !services.isStopping());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed telling master about split", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Coprocessor callback
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().postSplit(a,b);
|
||||
}
|
||||
|
||||
// Leaving here, the splitdir with its dross will be in place but since the
|
||||
// split was successful, just leave it; it'll be cleaned when parent is
|
||||
// deleted and cleaned up.
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the splitting node to be transitioned from pending_split
|
||||
* to splitting by master. That's how we are sure master has processed
|
||||
* the event and is good with us to move on. If we don't get any update,
|
||||
* we periodically transition the node so that master gets the callback.
|
||||
* If the node is removed or is not in pending_split state any more,
|
||||
* we abort the split.
|
||||
*/
|
||||
private int getZKNode(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
// Wait for the master to process the pending_split.
|
||||
try {
|
||||
int spins = 0;
|
||||
Stat stat = new Stat();
|
||||
ZooKeeperWatcher zkw = server.getZooKeeper();
|
||||
ServerName expectedServer = server.getServerName();
|
||||
String node = parent.getRegionInfo().getEncodedName();
|
||||
while (!(server.isStopped() || services.isStopping())) {
|
||||
if (spins % 5 == 0) {
|
||||
LOG.debug("Still waiting for master to process "
|
||||
+ "the pending_split for " + node);
|
||||
transitionSplittingNode(zkw, parent.getRegionInfo(),
|
||||
hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
|
||||
RS_ZK_REQUEST_REGION_SPLIT);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
spins++;
|
||||
byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
|
||||
if (data == null) {
|
||||
throw new IOException("Data is null, splitting node "
|
||||
+ node + " no longer exists");
|
||||
}
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
EventType et = rt.getEventType();
|
||||
if (et == RS_ZK_REGION_SPLITTING) {
|
||||
ServerName serverName = rt.getServerName();
|
||||
if (!serverName.equals(expectedServer)) {
|
||||
throw new IOException("Splitting node " + node + " is for "
|
||||
+ serverName + ", not us " + expectedServer);
|
||||
}
|
||||
byte [] payloadOfSplitting = rt.getPayload();
|
||||
List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
|
||||
payloadOfSplitting, 0, payloadOfSplitting.length);
|
||||
assert splittingRegions.size() == 2;
|
||||
HRegionInfo a = splittingRegions.get(0);
|
||||
HRegionInfo b = splittingRegions.get(1);
|
||||
if (!(hri_a.equals(a) && hri_b.equals(b))) {
|
||||
throw new IOException("Splitting node " + node + " is for " + a + ", "
|
||||
+ b + ", not expected daughters: " + hri_a + ", " + hri_b);
|
||||
}
|
||||
// Master has processed it.
|
||||
return stat.getVersion();
|
||||
}
|
||||
if (et != RS_ZK_REQUEST_REGION_SPLIT) {
|
||||
throw new IOException("Splitting node " + node
|
||||
+ " moved out of splitting to " + et);
|
||||
}
|
||||
}
|
||||
// Server is stopping/stopped
|
||||
throw new IOException("Server is "
|
||||
+ (services.isStopping() ? "stopping" : "stopped"));
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed getting SPLITTING znode on "
|
||||
+ parent.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the transaction.
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @throws IOException If thrown, transaction failed.
|
||||
* Call {@link #rollback(Server, RegionServerServices)}
|
||||
@ -565,6 +425,11 @@ public class SplitTransaction {
|
||||
public PairOfSameType<HRegion> execute(final Server server,
|
||||
final RegionServerServices services)
|
||||
throws IOException {
|
||||
if (server != null && server.getCoordinatedStateManager() != null) {
|
||||
std =
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().getDefaultDetails();
|
||||
}
|
||||
PairOfSameType<HRegion> regions = createDaughters(server, services);
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().preSplitAfterPONR();
|
||||
@ -576,7 +441,17 @@ public class SplitTransaction {
|
||||
final RegionServerServices services, PairOfSameType<HRegion> regions)
|
||||
throws IOException {
|
||||
openDaughters(server, services, regions.getFirst(), regions.getSecond());
|
||||
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
|
||||
if (server != null && server.getCoordinatedStateManager() != null) {
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
|
||||
regions.getSecond(), std, parent);
|
||||
}
|
||||
// Coprocessor callback
|
||||
if (parent.getCoprocessorHost() != null) {
|
||||
parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
|
||||
}
|
||||
|
||||
|
||||
return regions;
|
||||
}
|
||||
|
||||
@ -800,9 +675,10 @@ public class SplitTransaction {
|
||||
JournalEntry je = iterator.previous();
|
||||
switch(je) {
|
||||
|
||||
case SET_SPLITTING_IN_ZK:
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
cleanZK(server, this.parent.getRegionInfo());
|
||||
case SET_SPLITTING:
|
||||
if (server != null && server instanceof HRegionServer) {
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
|
||||
}
|
||||
break;
|
||||
|
||||
@ -864,88 +740,4 @@ public class SplitTransaction {
|
||||
return hri_b;
|
||||
}
|
||||
|
||||
private static void cleanZK(final Server server, final HRegionInfo hri) {
|
||||
try {
|
||||
// Only delete if its in expected state; could have been hijacked.
|
||||
if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||
RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
|
||||
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||
RS_ZK_REGION_SPLITTING, server.getServerName());
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
|
||||
} catch (KeeperException e) {
|
||||
server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
|
||||
* Create it ephemeral in case regionserver dies mid-split.
|
||||
*
|
||||
* <p>Does not transition nodes from other states. If a node already exists
|
||||
* for this region, a {@link NodeExistsException} will be thrown.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param region region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
|
||||
final ServerName serverName, final HRegionInfo a,
|
||||
final HRegionInfo b) throws KeeperException, IOException {
|
||||
LOG.debug(zkw.prefix("Creating ephemeral node for " +
|
||||
region.getEncodedName() + " in PENDING_SPLIT state"));
|
||||
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
|
||||
RegionTransition rt = RegionTransition.createRegionTransition(
|
||||
RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
|
||||
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
|
||||
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
|
||||
throw new IOException("Failed create of ephemeral " + node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions an existing ephemeral node for the specified region which is
|
||||
* currently in the begin state to be in the end state. Master cleans up the
|
||||
* final SPLIT znode when it reads it (or if we crash, zk will clean it up).
|
||||
*
|
||||
* <p>Does not transition nodes from other states. If for some reason the
|
||||
* node could not be transitioned, the method returns -1. If the transition
|
||||
* is successful, the version of the node after transition is returned.
|
||||
*
|
||||
* <p>This method can fail and return false for three different reasons:
|
||||
* <ul><li>Node for this region does not exist</li>
|
||||
* <li>Node for this region is not in the begin state</li>
|
||||
* <li>After verifying the begin state, update fails because of wrong version
|
||||
* (this should never actually happen since an RS only does this transition
|
||||
* following a transition to the begin state. If two RS are conflicting, one would
|
||||
* fail the original transition to the begin state and not this transition)</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>Does not set any watches.
|
||||
*
|
||||
* <p>This method should only be used by a RegionServer when splitting a region.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param parent region to be transitioned to opened
|
||||
* @param a Daughter a of split
|
||||
* @param b Daughter b of split
|
||||
* @param serverName server event originates from
|
||||
* @param znodeVersion expected version of data before modification
|
||||
* @param beginState the expected current state the znode should be
|
||||
* @param endState the state to be transition to
|
||||
* @return version of node after transition, -1 if unsuccessful transition
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int transitionSplittingNode(ZooKeeperWatcher zkw,
|
||||
HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
|
||||
final int znodeVersion, final EventType beginState,
|
||||
final EventType endState) throws KeeperException, IOException {
|
||||
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
|
||||
return ZKAssign.transitionNode(zkw, parent, serverName,
|
||||
beginState, endState, znodeVersion, payload);
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
|
@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
@ -139,8 +140,9 @@ public class TestEndToEndSplitTransaction {
|
||||
assertTrue(test(con, tableName, lastRow, server));
|
||||
|
||||
// 4. phase III
|
||||
split.transitionZKNode(server, server, regions.getFirst(),
|
||||
regions.getSecond());
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(),
|
||||
regions.getSecond(), split.std, region);
|
||||
assertTrue(test(con, tableName, firstRow, server));
|
||||
assertTrue(test(con, tableName, lastRow, server));
|
||||
}
|
||||
|
@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
@ -190,6 +192,10 @@ public class TestSplitTransactionOnCluster {
|
||||
// find a splittable region
|
||||
final HRegion region = findSplittableRegion(regions);
|
||||
assertTrue("not able to find a splittable region", region != null);
|
||||
MockedCoordinatedStateManager cp = new MockedCoordinatedStateManager();
|
||||
cp.initialize(regionServer, region);
|
||||
cp.start();
|
||||
regionServer.csm = cp;
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
@ -1083,18 +1089,52 @@ public class TestSplitTransactionOnCluster {
|
||||
TESTING_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
public static class MockedCoordinatedStateManager extends ZkCoordinatedStateManager {
|
||||
|
||||
public static class MockedSplitTransaction extends SplitTransaction {
|
||||
public void initialize(Server server, HRegion region) {
|
||||
this.server = server;
|
||||
this.watcher = server.getZooKeeper();
|
||||
splitTransactionCoordination = new MockedSplitTransactionCoordination(this, watcher, region);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockedSplitTransaction extends SplitTransaction {
|
||||
|
||||
private HRegion currentRegion;
|
||||
public MockedSplitTransaction(HRegion region, byte[] splitrow) {
|
||||
super(region, splitrow);
|
||||
this.currentRegion = region;
|
||||
}
|
||||
@Override
|
||||
public boolean rollback(Server server, RegionServerServices services) throws IOException {
|
||||
if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
if(secondSplit){
|
||||
super.rollback(server, services);
|
||||
latch.countDown();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return super.rollback(server, services);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static class MockedSplitTransactionCoordination extends ZKSplitTransactionCoordination {
|
||||
|
||||
private HRegion currentRegion;
|
||||
public MockedSplitTransaction(HRegion r, byte[] splitrow) {
|
||||
super(r, splitrow);
|
||||
this.currentRegion = r;
|
||||
|
||||
public MockedSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
|
||||
ZooKeeperWatcher watcher, HRegion region) {
|
||||
super(coordinationProvider, watcher);
|
||||
currentRegion = region;
|
||||
}
|
||||
|
||||
@Override
|
||||
void transitionZKNode(Server server, RegionServerServices services, HRegion a, HRegion b)
|
||||
throws IOException {
|
||||
public void completeSplitTransaction(RegionServerServices services, HRegion a, HRegion b,
|
||||
SplitTransactionDetails std, HRegion parent) throws IOException {
|
||||
if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
try {
|
||||
@ -1106,25 +1146,12 @@ public class TestSplitTransactionOnCluster {
|
||||
}
|
||||
|
||||
}
|
||||
super.transitionZKNode(server, services, a, b);
|
||||
super.completeSplitTransaction(services, a, b, std, parent);
|
||||
if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
firstSplitCompleted = true;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public boolean rollback(Server server, RegionServerServices services) throws IOException {
|
||||
if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
if(secondSplit){
|
||||
super.rollback(server, services);
|
||||
latch.countDown();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return super.rollback(server, services);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private HRegion findSplittableRegion(final List<HRegion> regions) throws InterruptedException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user