HBASE-11069 Decouple region merging from ZooKeeper (Sergey Soldatov)
This commit is contained in:
parent
6304eb2cce
commit
e476947d3f
|
@ -67,4 +67,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
|
|||
* Method to retrieve coordination for opening region operations.
|
||||
*/
|
||||
public abstract OpenRegionCoordination getOpenRegionCoordination();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to retrieve coordination for region merge transaction
|
||||
*/
|
||||
public abstract RegionMergeCoordination getRegionMergeCoordination();
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
|
||||
/**
|
||||
* Coordination operations for region merge transaction. The operation should be coordinated at the
|
||||
* following stages:<br>
|
||||
* 1. startRegionMergeTransaction - all preparation/initialization for merge region transaction<br>
|
||||
* 2. waitForRegionMergeTransaction - wait until coordination complete all works related
|
||||
* to merge<br>
|
||||
* 3. confirmRegionMergeTransaction - confirm that the merge could be completed and none of merging
|
||||
* regions moved somehow<br>
|
||||
* 4. completeRegionMergeTransaction - all steps that are required to complete the transaction.
|
||||
* Called after PONR (point of no return) <br>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RegionMergeCoordination {
|
||||
|
||||
RegionMergeDetails getDefaultDetails();
|
||||
|
||||
/**
|
||||
* Dummy interface for region merge transaction details.
|
||||
*/
|
||||
public static interface RegionMergeDetails {
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the region merge transaction
|
||||
* @param region region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @throws IOException
|
||||
*/
|
||||
void startRegionMergeTransaction(HRegionInfo region, ServerName serverName, HRegionInfo a,
|
||||
HRegionInfo b) throws IOException;
|
||||
|
||||
/**
|
||||
* Get everything ready for region merge
|
||||
* @throws IOException
|
||||
*/
|
||||
void waitForRegionMergeTransaction(RegionServerServices services, HRegionInfo mergedRegionInfo,
|
||||
HRegion region_a, HRegion region_b, RegionMergeDetails details) throws IOException;
|
||||
|
||||
/**
|
||||
* Confirm that the region merge can be performed
|
||||
* @param merged region
|
||||
* @param a merging region A
|
||||
* @param b merging region B
|
||||
* @param serverName server event originates from
|
||||
* @param rmd region merge details
|
||||
* @throws IOException If thrown, transaction failed.
|
||||
*/
|
||||
void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
|
||||
ServerName serverName, RegionMergeDetails rmd) throws IOException;
|
||||
|
||||
/**
|
||||
* @param merged region
|
||||
* @param a merging region A
|
||||
* @param b merging region B
|
||||
* @param serverName server event originates from
|
||||
* @param rmd region merge details
|
||||
* @throws IOException
|
||||
*/
|
||||
void processRegionMergeRequest(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
|
||||
ServerName serverName, RegionMergeDetails rmd) throws IOException;
|
||||
|
||||
/**
|
||||
* Finish off merge transaction
|
||||
* @param services Used to online/offline regions.
|
||||
* @param merged region
|
||||
* @param region_a merging region A
|
||||
* @param region_b merging region B
|
||||
* @param rmd region merge details
|
||||
* @param mergedRegion
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link RegionMergeTransaction#rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
void completeRegionMergeTransaction(RegionServerServices services, HRegionInfo merged,
|
||||
HRegion region_a, HRegion region_b, RegionMergeDetails rmd, HRegion mergedRegion)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* This method is used during rollback
|
||||
* @param merged region to be rolled back
|
||||
*/
|
||||
void clean(HRegionInfo merged);
|
||||
|
||||
}
|
|
@ -38,6 +38,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
protected SplitTransactionCoordination splitTransactionCoordination;
|
||||
protected CloseRegionCoordination closeRegionCoordination;
|
||||
protected OpenRegionCoordination openRegionCoordination;
|
||||
protected RegionMergeCoordination regionMergeCoordination;
|
||||
|
||||
@Override
|
||||
public void initialize(Server server) {
|
||||
|
@ -47,6 +48,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
|
||||
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
|
||||
openRegionCoordination = new ZkOpenRegionCoordination(this, watcher);
|
||||
regionMergeCoordination = new ZkRegionMergeCoordination(this, watcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,4 +80,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
|||
public OpenRegionCoordination getOpenRegionCoordination() {
|
||||
return openRegionCoordination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionMergeCoordination getRegionMergeCoordination() {
|
||||
return regionMergeCoordination;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,326 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
public class ZkRegionMergeCoordination implements RegionMergeCoordination {
|
||||
|
||||
private CoordinatedStateManager manager;
|
||||
private final ZooKeeperWatcher watcher;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
|
||||
|
||||
public ZkRegionMergeCoordination(CoordinatedStateManager manager,
|
||||
ZooKeeperWatcher watcher) {
|
||||
this.manager = manager;
|
||||
this.watcher = watcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* ZK-based implementation. Has details about whether the state transition should be reflected in
|
||||
* ZK, as well as expected version of znode.
|
||||
*/
|
||||
public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
|
||||
private int znodeVersion;
|
||||
|
||||
public ZkRegionMergeDetails() {
|
||||
}
|
||||
|
||||
public int getZnodeVersion() {
|
||||
return znodeVersion;
|
||||
}
|
||||
|
||||
public void setZnodeVersion(int znodeVersion) {
|
||||
this.znodeVersion = znodeVersion;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionMergeDetails getDefaultDetails() {
|
||||
ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
|
||||
zstd.setZnodeVersion(-1);
|
||||
return zstd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the merging node to be transitioned from pending_merge
|
||||
* to merging by master. That's how we are sure master has processed
|
||||
* the event and is good with us to move on. If we don't get any update,
|
||||
* we periodically transition the node so that master gets the callback.
|
||||
* If the node is removed or is not in pending_merge state any more,
|
||||
* we abort the merge.
|
||||
* @throws IOException
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void waitForRegionMergeTransaction(RegionServerServices services,
|
||||
HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
|
||||
throws IOException {
|
||||
try {
|
||||
int spins = 0;
|
||||
Stat stat = new Stat();
|
||||
ServerName expectedServer = manager.getServer().getServerName();
|
||||
String node = mergedRegionInfo.getEncodedName();
|
||||
ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
|
||||
while (!(manager.getServer().isStopped() || services.isStopping())) {
|
||||
if (spins % 5 == 0) {
|
||||
LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
|
||||
ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
|
||||
transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
|
||||
region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
|
||||
RS_ZK_REQUEST_REGION_MERGE);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
spins++;
|
||||
byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
|
||||
if (data == null) {
|
||||
throw new IOException("Data is null, merging node " + node + " no longer exists");
|
||||
}
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
EventType et = rt.getEventType();
|
||||
if (et == RS_ZK_REGION_MERGING) {
|
||||
ServerName serverName = rt.getServerName();
|
||||
if (!serverName.equals(expectedServer)) {
|
||||
throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
|
||||
+ expectedServer);
|
||||
}
|
||||
byte[] payloadOfMerging = rt.getPayload();
|
||||
List<HRegionInfo> mergingRegions =
|
||||
HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
|
||||
assert mergingRegions.size() == 3;
|
||||
HRegionInfo a = mergingRegions.get(1);
|
||||
HRegionInfo b = mergingRegions.get(2);
|
||||
HRegionInfo hri_a = region_a.getRegionInfo();
|
||||
HRegionInfo hri_b = region_b.getRegionInfo();
|
||||
if (!(hri_a.equals(a) && hri_b.equals(b))) {
|
||||
throw new IOException("Merging node " + node + " is for " + a + ", " + b
|
||||
+ ", not expected regions: " + hri_a + ", " + hri_b);
|
||||
}
|
||||
// Master has processed it.
|
||||
zdetails.setZnodeVersion(stat.getVersion());
|
||||
return;
|
||||
}
|
||||
if (et != RS_ZK_REQUEST_REGION_MERGE) {
|
||||
throw new IOException("Merging node " + node + " moved out of merging to " + et);
|
||||
}
|
||||
}
|
||||
// Server is stopping/stopped
|
||||
throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed getting MERGING znode on "
|
||||
+ mergedRegionInfo.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
|
||||
* Create it ephemeral in case regionserver dies mid-merge.
|
||||
*
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If a node already exists for
|
||||
* this region, a {@link NodeExistsException} will be thrown.
|
||||
*
|
||||
* @param region region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
|
||||
final HRegionInfo a, final HRegionInfo b) throws IOException {
|
||||
LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
|
||||
+ " in PENDING_MERGE state"));
|
||||
byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
|
||||
RegionTransition rt =
|
||||
RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
|
||||
serverName, payload);
|
||||
String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
|
||||
try {
|
||||
if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
|
||||
throw new IOException("Failed create of ephemeral " + node);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see
|
||||
* org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop
|
||||
* .hbase.Server, org.apache.hadoop.hbase.HRegionInfo)
|
||||
*/
|
||||
@Override
|
||||
public void clean(final HRegionInfo hri) {
|
||||
try {
|
||||
// Only delete if its in expected state; could have been hijacked.
|
||||
if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
|
||||
.getServer().getServerName())) {
|
||||
ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
|
||||
.getServer().getServerName());
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
|
||||
} catch (KeeperException e) {
|
||||
manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* ZooKeeper implementation of finishRegionMergeTransaction
|
||||
*/
|
||||
@Override
|
||||
public void completeRegionMergeTransaction(final RegionServerServices services,
|
||||
HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
|
||||
if (manager.getServer() == null
|
||||
|| manager.getServer().getCoordinatedStateManager() == null) {
|
||||
return;
|
||||
}
|
||||
// Tell master about merge by updating zk. If we fail, abort.
|
||||
try {
|
||||
transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
|
||||
manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
|
||||
|
||||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
int spins = 0;
|
||||
// Now wait for the master to process the merge. We know it's done
|
||||
// when the znode is deleted. The reason we keep tickling the znode is
|
||||
// that it's possible for the master to miss an event.
|
||||
do {
|
||||
if (spins % 10 == 0) {
|
||||
LOG.debug("Still waiting on the master to process the merge for "
|
||||
+ mergedRegionInfo.getEncodedName() + ", waited "
|
||||
+ (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
|
||||
}
|
||||
Thread.sleep(100);
|
||||
// When this returns -1 it means the znode doesn't exist
|
||||
transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
|
||||
manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
|
||||
spins++;
|
||||
} while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
|
||||
&& !services.isStopping());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed telling master about merge "
|
||||
+ mergedRegionInfo.getEncodedName(), e);
|
||||
}
|
||||
// Leaving here, the mergedir with its dross will be in place but since the
|
||||
// merge was successful, just leave it; it'll be cleaned when region_a is
|
||||
// cleaned up by CatalogJanitor on master
|
||||
}
|
||||
|
||||
/*
|
||||
* Zookeeper implementation of region merge confirmation
|
||||
*/
|
||||
@Override
|
||||
public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
|
||||
ServerName serverName, RegionMergeDetails rmd) throws IOException {
|
||||
transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
|
||||
RS_ZK_REGION_MERGING);
|
||||
}
|
||||
|
||||
/*
|
||||
* Zookeeper implementation of region merge processing
|
||||
*/
|
||||
@Override
|
||||
public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
|
||||
ServerName sn, RegionMergeDetails rmd) throws IOException {
|
||||
transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
|
||||
EventType.RS_ZK_REGION_MERGING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions an existing ephemeral node for the specified region which is
|
||||
* currently in the begin state to be in the end state. Master cleans up the
|
||||
* final MERGE znode when it reads it (or if we crash, zk will clean it up).
|
||||
*
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If for some reason the node
|
||||
* could not be transitioned, the method returns -1. If the transition is
|
||||
* successful, the version of the node after transition is updated in details.
|
||||
*
|
||||
* <p>
|
||||
* This method can fail and return false for three different reasons:
|
||||
* <ul>
|
||||
* <li>Node for this region does not exist</li>
|
||||
* <li>Node for this region is not in the begin state</li>
|
||||
* <li>After verifying the begin state, update fails because of wrong version
|
||||
* (this should never actually happen since an RS only does this transition
|
||||
* following a transition to the begin state. If two RS are conflicting, one would
|
||||
* fail the original transition to the begin state and not this transition)</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* Does not set any watches.
|
||||
*
|
||||
* <p>
|
||||
* This method should only be used by a RegionServer when merging two regions.
|
||||
*
|
||||
* @param merged region to be transitioned to opened
|
||||
* @param a merging region A
|
||||
* @param b merging region B
|
||||
* @param serverName server event originates from
|
||||
* @param rmd region merge details
|
||||
* @param beginState the expected current state the node should be
|
||||
* @param endState the state to be transition to
|
||||
* @throws IOException
|
||||
*/
|
||||
private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
|
||||
ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
|
||||
final EventType endState) throws IOException {
|
||||
ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
|
||||
byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
|
||||
try {
|
||||
zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
|
||||
endState, zrmd.getZnodeVersion(), payload));
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -60,10 +60,12 @@ import org.apache.hadoop.hbase.TableStateManager;
|
|||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
|
||||
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
|
@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
|
@ -3538,11 +3539,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
EventType et = rt.getEventType();
|
||||
if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
|
||||
try {
|
||||
if (RegionMergeTransaction.transitionMergingNode(watcher, p,
|
||||
hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
|
||||
EventType.RS_ZK_REGION_MERGING) == -1) {
|
||||
RegionMergeCoordination.RegionMergeDetails std =
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().getDefaultDetails();
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
|
||||
if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
|
||||
byte[] data = ZKAssign.getData(watcher, encodedName);
|
||||
EventType currentType = null;
|
||||
EventType currentType = null;
|
||||
if (data != null) {
|
||||
RegionTransition newRt = RegionTransition.parseFrom(data);
|
||||
currentType = newRt.getEventType();
|
||||
|
|
|
@ -13,15 +13,11 @@
|
|||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* License for the specific language governing permissions and limitationsME
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
|
||||
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -35,7 +31,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
|
@ -44,19 +39,15 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
|
|||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConfigUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* Executes region merge as a "transaction". It is similar with
|
||||
|
@ -97,10 +88,9 @@ public class RegionMergeTransaction {
|
|||
private final HRegion region_b;
|
||||
// merges dir is under region_a
|
||||
private final Path mergesdir;
|
||||
private int znodeVersion = -1;
|
||||
// We only merge adjacent regions if forcible is false
|
||||
private final boolean forcible;
|
||||
private boolean useZKForAssignment;
|
||||
private boolean useCoordinationForAssignment;
|
||||
|
||||
/**
|
||||
* Types to add to the transaction journal. Each enum is a step in the merge
|
||||
|
@ -110,7 +100,7 @@ public class RegionMergeTransaction {
|
|||
/**
|
||||
* Set region as in transition, set it into MERGING state.
|
||||
*/
|
||||
SET_MERGING_IN_ZK,
|
||||
SET_MERGING,
|
||||
/**
|
||||
* We created the temporary merge data directory.
|
||||
*/
|
||||
|
@ -152,6 +142,8 @@ public class RegionMergeTransaction {
|
|||
|
||||
private RegionServerCoprocessorHost rsCoprocessorHost = null;
|
||||
|
||||
private RegionMergeDetails rmd;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param a region a to merge
|
||||
|
@ -230,8 +222,7 @@ public class RegionMergeTransaction {
|
|||
|
||||
/**
|
||||
* Run the transaction.
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link #rollback(Server, RegionServerServices)}
|
||||
|
@ -240,9 +231,15 @@ public class RegionMergeTransaction {
|
|||
* @see #rollback(Server, RegionServerServices)
|
||||
*/
|
||||
public HRegion execute(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
useZKForAssignment = server == null ? true :
|
||||
ConfigUtil.useZKForAssignment(server.getConfiguration());
|
||||
final RegionServerServices services) throws IOException {
|
||||
useCoordinationForAssignment =
|
||||
server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
|
||||
if (rmd == null) {
|
||||
rmd =
|
||||
server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
|
||||
.getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
|
||||
: null;
|
||||
}
|
||||
if (rsCoprocessorHost == null) {
|
||||
rsCoprocessorHost = server != null ?
|
||||
((HRegionServer) server).getRegionServerCoprocessorHost() : null;
|
||||
|
@ -257,14 +254,20 @@ public class RegionMergeTransaction {
|
|||
public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
openMergedRegion(server, services, mergedRegion);
|
||||
transitionZKNode(server, services, mergedRegion);
|
||||
if (useCoordination(server)) {
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().completeRegionMergeTransaction(services, mergedRegionInfo,
|
||||
region_a, region_b, rmd, mergedRegion);
|
||||
}
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
|
||||
}
|
||||
return mergedRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the merged region and region files.
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @return merged region
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
|
@ -286,7 +289,7 @@ public class RegionMergeTransaction {
|
|||
}
|
||||
}
|
||||
|
||||
// If true, no cluster to write meta edits to or to update znodes in.
|
||||
// If true, no cluster to write meta edits to or to use coordination.
|
||||
boolean testing = server == null ? true : server.getConfiguration()
|
||||
.getBoolean("hbase.testing.nocluster", false);
|
||||
|
||||
|
@ -320,7 +323,7 @@ public class RegionMergeTransaction {
|
|||
// will determine whether the region is merged or not in case of failures.
|
||||
// If it is successful, master will roll-forward, if not, master will
|
||||
// rollback
|
||||
if (!testing && useZKForAssignment) {
|
||||
if (!testing && useCoordinationForAssignment) {
|
||||
if (metaEntries.isEmpty()) {
|
||||
MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
|
||||
.getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
|
||||
|
@ -328,7 +331,7 @@ public class RegionMergeTransaction {
|
|||
mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
|
||||
region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
|
||||
}
|
||||
} else if (services != null && !useZKForAssignment) {
|
||||
} else if (services != null && !useCoordinationForAssignment) {
|
||||
if (!services.reportRegionTransition(TransitionCode.MERGE_PONR,
|
||||
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
|
||||
// Passed PONR, let SSH clean it up
|
||||
|
@ -377,17 +380,24 @@ public class RegionMergeTransaction {
|
|||
|
||||
public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
|
||||
boolean testing) throws IOException {
|
||||
// Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
|
||||
// have zookeeper so don't do zk stuff if server or zookeeper is null
|
||||
if (useZKAndZKIsSet(server)) {
|
||||
if (rmd == null) {
|
||||
rmd =
|
||||
server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
|
||||
.getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
|
||||
: null;
|
||||
}
|
||||
|
||||
// If server doesn't have a coordination state manager, don't do coordination actions.
|
||||
if (useCoordination(server)) {
|
||||
try {
|
||||
createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
|
||||
server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed creating PENDING_MERGE znode on "
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo,
|
||||
server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to start region merge transaction for "
|
||||
+ this.mergedRegionInfo.getRegionNameAsString(), e);
|
||||
}
|
||||
} else if (services != null && !useZKForAssignment) {
|
||||
} else if (services != null && !useCoordinationForAssignment) {
|
||||
if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE,
|
||||
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
|
||||
throw new IOException("Failed to get ok from master to merge "
|
||||
|
@ -395,12 +405,14 @@ public class RegionMergeTransaction {
|
|||
+ region_b.getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
|
||||
if (useZKAndZKIsSet(server)) {
|
||||
this.journal.add(JournalEntry.SET_MERGING);
|
||||
if (useCoordination(server)) {
|
||||
// After creating the merge node, wait for master to transition it
|
||||
// from PENDING_MERGE to MERGING so that we can move on. We want master
|
||||
// knows about it and won't transition any region which is merging.
|
||||
znodeVersion = getZKNode(server, services);
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().waitForRegionMergeTransaction(services, mergedRegionInfo,
|
||||
region_a, region_b, rmd);
|
||||
}
|
||||
|
||||
this.region_a.getRegionFileSystem().createMergesDir();
|
||||
|
@ -420,16 +432,15 @@ public class RegionMergeTransaction {
|
|||
// clean this up.
|
||||
mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
|
||||
|
||||
if (server != null && useZKAndZKIsSet(server)) {
|
||||
if (useCoordination(server)) {
|
||||
try {
|
||||
// Do one more check on the merging znode (before it is too late) in case
|
||||
// any merging region is moved somehow. If so, the znode transition will fail.
|
||||
this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
|
||||
this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
|
||||
server.getServerName(), this.znodeVersion,
|
||||
RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed setting MERGING znode on "
|
||||
// Do the final check in case any merging region is moved somehow. If so, the transition
|
||||
// will fail.
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo,
|
||||
region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed setting MERGING on "
|
||||
+ this.mergedRegionInfo.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
@ -545,8 +556,7 @@ public class RegionMergeTransaction {
|
|||
|
||||
/**
|
||||
* Perform time consuming opening of the merged region.
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @param merged the merged region
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
|
@ -569,7 +579,7 @@ public class RegionMergeTransaction {
|
|||
|
||||
if (services != null) {
|
||||
try {
|
||||
if (useZKForAssignment) {
|
||||
if (useCoordinationForAssignment) {
|
||||
services.postOpenDeployTasks(merged, server.getCatalogTracker());
|
||||
} else if (!services.reportRegionTransition(TransitionCode.MERGED,
|
||||
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
|
||||
|
@ -584,134 +594,6 @@ public class RegionMergeTransaction {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish off merge transaction, transition the zknode
|
||||
* @param server Hosting server instance. Can be null when testing (won't try
|
||||
* and update in zk if a null server)
|
||||
* @param services Used to online/offline regions.
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link #rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
void transitionZKNode(final Server server, final RegionServerServices services,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
if (useZKAndZKIsSet(server)) {
|
||||
// Tell master about merge by updating zk. If we fail, abort.
|
||||
try {
|
||||
this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
|
||||
this.mergedRegionInfo, region_a.getRegionInfo(),
|
||||
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
|
||||
RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
|
||||
|
||||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
int spins = 0;
|
||||
// Now wait for the master to process the merge. We know it's done
|
||||
// when the znode is deleted. The reason we keep tickling the znode is
|
||||
// that it's possible for the master to miss an event.
|
||||
do {
|
||||
if (spins % 10 == 0) {
|
||||
LOG.debug("Still waiting on the master to process the merge for "
|
||||
+ this.mergedRegionInfo.getEncodedName() + ", waited "
|
||||
+ (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
|
||||
}
|
||||
Thread.sleep(100);
|
||||
// When this returns -1 it means the znode doesn't exist
|
||||
this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
|
||||
this.mergedRegionInfo, region_a.getRegionInfo(),
|
||||
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
|
||||
RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
|
||||
spins++;
|
||||
} while (this.znodeVersion != -1 && !server.isStopped()
|
||||
&& !services.isStopping());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed telling master about merge "
|
||||
+ mergedRegionInfo.getEncodedName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
|
||||
}
|
||||
|
||||
// Leaving here, the mergedir with its dross will be in place but since the
|
||||
// merge was successful, just leave it; it'll be cleaned when region_a is
|
||||
// cleaned up by CatalogJanitor on master
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the merging node to be transitioned from pending_merge
|
||||
* to merging by master. That's how we are sure master has processed
|
||||
* the event and is good with us to move on. If we don't get any update,
|
||||
* we periodically transition the node so that master gets the callback.
|
||||
* If the node is removed or is not in pending_merge state any more,
|
||||
* we abort the merge.
|
||||
*/
|
||||
private int getZKNode(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
// Wait for the master to process the pending_merge.
|
||||
try {
|
||||
int spins = 0;
|
||||
Stat stat = new Stat();
|
||||
ZooKeeperWatcher zkw = server.getZooKeeper();
|
||||
ServerName expectedServer = server.getServerName();
|
||||
String node = mergedRegionInfo.getEncodedName();
|
||||
while (!(server.isStopped() || services.isStopping())) {
|
||||
if (spins % 5 == 0) {
|
||||
LOG.debug("Still waiting for master to process "
|
||||
+ "the pending_merge for " + node);
|
||||
transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
|
||||
region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
|
||||
RS_ZK_REQUEST_REGION_MERGE);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
spins++;
|
||||
byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
|
||||
if (data == null) {
|
||||
throw new IOException("Data is null, merging node "
|
||||
+ node + " no longer exists");
|
||||
}
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
EventType et = rt.getEventType();
|
||||
if (et == RS_ZK_REGION_MERGING) {
|
||||
ServerName serverName = rt.getServerName();
|
||||
if (!serverName.equals(expectedServer)) {
|
||||
throw new IOException("Merging node " + node + " is for "
|
||||
+ serverName + ", not us " + expectedServer);
|
||||
}
|
||||
byte [] payloadOfMerging = rt.getPayload();
|
||||
List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
|
||||
payloadOfMerging, 0, payloadOfMerging.length);
|
||||
assert mergingRegions.size() == 3;
|
||||
HRegionInfo a = mergingRegions.get(1);
|
||||
HRegionInfo b = mergingRegions.get(2);
|
||||
HRegionInfo hri_a = region_a.getRegionInfo();
|
||||
HRegionInfo hri_b = region_b.getRegionInfo();
|
||||
if (!(hri_a.equals(a) && hri_b.equals(b))) {
|
||||
throw new IOException("Merging node " + node + " is for " + a + ", "
|
||||
+ b + ", not expected regions: " + hri_a + ", " + hri_b);
|
||||
}
|
||||
// Master has processed it.
|
||||
return stat.getVersion();
|
||||
}
|
||||
if (et != RS_ZK_REQUEST_REGION_MERGE) {
|
||||
throw new IOException("Merging node " + node
|
||||
+ " moved out of merging to " + et);
|
||||
}
|
||||
}
|
||||
// Server is stopping/stopped
|
||||
throw new IOException("Server is "
|
||||
+ (services.isStopping() ? "stopping" : "stopped"));
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new IOException("Failed getting MERGING znode on "
|
||||
+ mergedRegionInfo.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create reference file(s) of merging regions under the region_a merges dir
|
||||
* @param hstoreFilesOfRegionA
|
||||
|
@ -769,14 +651,15 @@ public class RegionMergeTransaction {
|
|||
JournalEntry je = iterator.previous();
|
||||
switch (je) {
|
||||
|
||||
case SET_MERGING_IN_ZK:
|
||||
if (useZKAndZKIsSet(server)) {
|
||||
cleanZK(server, this.mergedRegionInfo);
|
||||
} else if (services != null && !useZKForAssignment
|
||||
case SET_MERGING:
|
||||
if (useCoordination(server)) {
|
||||
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().clean(this.mergedRegionInfo);
|
||||
} else if (services != null && !useCoordinationForAssignment
|
||||
&& !services.reportRegionTransition(TransitionCode.MERGE_REVERTED,
|
||||
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case CREATED_MERGE_DIR:
|
||||
|
@ -851,100 +734,12 @@ public class RegionMergeTransaction {
|
|||
return this.mergesdir;
|
||||
}
|
||||
|
||||
private boolean useZKAndZKIsSet(final Server server) {
|
||||
return server != null && useZKForAssignment && server.getZooKeeper() != null;
|
||||
private boolean useCoordination(final Server server) {
|
||||
return server != null && useCoordinationForAssignment
|
||||
&& server.getCoordinatedStateManager() != null;
|
||||
}
|
||||
|
||||
private static void cleanZK(final Server server, final HRegionInfo hri) {
|
||||
try {
|
||||
// Only delete if its in expected state; could have been hijacked.
|
||||
if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||
RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) {
|
||||
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||
RS_ZK_REGION_MERGING, server.getServerName());
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
|
||||
} catch (KeeperException e) {
|
||||
server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
|
||||
* Create it ephemeral in case regionserver dies mid-merge.
|
||||
*
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If a node already exists for
|
||||
* this region, a {@link NodeExistsException} will be thrown.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param region region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
|
||||
final ServerName serverName, final HRegionInfo a,
|
||||
final HRegionInfo b) throws KeeperException, IOException {
|
||||
LOG.debug(zkw.prefix("Creating ephemeral node for "
|
||||
+ region.getEncodedName() + " in PENDING_MERGE state"));
|
||||
byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
|
||||
RegionTransition rt = RegionTransition.createRegionTransition(
|
||||
RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
|
||||
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
|
||||
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
|
||||
throw new IOException("Failed create of ephemeral " + node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions an existing ephemeral node for the specified region which is
|
||||
* currently in the begin state to be in the end state. Master cleans up the
|
||||
* final MERGE znode when it reads it (or if we crash, zk will clean it up).
|
||||
*
|
||||
* <p>
|
||||
* Does not transition nodes from other states. If for some reason the node
|
||||
* could not be transitioned, the method returns -1. If the transition is
|
||||
* successful, the version of the node after transition is returned.
|
||||
*
|
||||
* <p>
|
||||
* This method can fail and return false for three different reasons:
|
||||
* <ul>
|
||||
* <li>Node for this region does not exist</li>
|
||||
* <li>Node for this region is not in the begin state</li>
|
||||
* <li>After verifying the begin state, update fails because of wrong version
|
||||
* (this should never actually happen since an RS only does this transition
|
||||
* following a transition to the begin state. If two RS are conflicting, one would
|
||||
* fail the original transition to the begin state and not this transition)</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* Does not set any watches.
|
||||
*
|
||||
* <p>
|
||||
* This method should only be used by a RegionServer when merging two regions.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param merged region to be transitioned to opened
|
||||
* @param a merging region A
|
||||
* @param b merging region B
|
||||
* @param serverName server event originates from
|
||||
* @param znodeVersion expected version of data before modification
|
||||
* @param beginState the expected current state the znode should be
|
||||
* @param endState the state to be transition to
|
||||
* @return version of node after transition, -1 if unsuccessful transition
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int transitionMergingNode(ZooKeeperWatcher zkw,
|
||||
HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
|
||||
final int znodeVersion, final EventType beginState,
|
||||
final EventType endState) throws KeeperException, IOException {
|
||||
byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
|
||||
return ZKAssign.transitionNode(zkw, merged, serverName,
|
||||
beginState, endState, znodeVersion, payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given region has merge qualifier in hbase:meta
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
|
@ -389,8 +390,8 @@ public class TestMasterFailover {
|
|||
|
||||
// Regions of table of merging regions
|
||||
// Cause: Master was down while merging was going on
|
||||
RegionMergeTransaction.createNodeMerging(
|
||||
zkw, newRegion, mergingServer, a, b);
|
||||
((BaseCoordinatedStateManager) hrs.getCoordinatedStateManager())
|
||||
.getRegionMergeCoordination().startRegionMergeTransaction(newRegion, mergingServer, a, b);
|
||||
|
||||
/*
|
||||
* ZK = NONE
|
||||
|
|
Loading…
Reference in New Issue