HBASE-3559 Move report of split to master OFF the heartbeat channel
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1075600 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
05864ff9b0
commit
5e25568812
|
@ -88,6 +88,9 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-3560 the hbase-default entry of "hbase.defaults.for.version"
|
HBASE-3560 the hbase-default entry of "hbase.defaults.for.version"
|
||||||
causes tests not to run via not-maven
|
causes tests not to run via not-maven
|
||||||
|
|
||||||
|
TASK
|
||||||
|
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||||
|
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
||||||
|
|
|
@ -47,11 +47,6 @@ public class HMsg implements Writable {
|
||||||
*/
|
*/
|
||||||
STOP_REGIONSERVER,
|
STOP_REGIONSERVER,
|
||||||
|
|
||||||
/**
|
|
||||||
* Region server split the region associated with this message.
|
|
||||||
*/
|
|
||||||
REGION_SPLIT,
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When RegionServer receives this message, it goes into a sleep that only
|
* When RegionServer receives this message, it goes into a sleep that only
|
||||||
* an exit will cure. This message is sent by unit tests simulating
|
* an exit will cure. This message is sent by unit tests simulating
|
||||||
|
@ -229,10 +224,6 @@ public class HMsg implements Writable {
|
||||||
out.writeBoolean(true);
|
out.writeBoolean(true);
|
||||||
Bytes.writeByteArray(out, this.message);
|
Bytes.writeByteArray(out, this.message);
|
||||||
}
|
}
|
||||||
if (this.type.equals(Type.REGION_SPLIT)) {
|
|
||||||
this.daughterA.write(out);
|
|
||||||
this.daughterB.write(out);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -246,11 +237,5 @@ public class HMsg implements Writable {
|
||||||
if (hasMessage) {
|
if (hasMessage) {
|
||||||
this.message = Bytes.readByteArray(in);
|
this.message = Bytes.readByteArray(in);
|
||||||
}
|
}
|
||||||
if (this.type.equals(Type.REGION_SPLIT)) {
|
|
||||||
this.daughterA = new HRegionInfo();
|
|
||||||
this.daughterB = new HRegionInfo();
|
|
||||||
this.daughterA.readFields(in);
|
|
||||||
this.daughterB.readFields(in);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -104,6 +104,8 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
RS_ZK_REGION_CLOSED (2), // RS has finished closing a region
|
RS_ZK_REGION_CLOSED (2), // RS has finished closing a region
|
||||||
RS_ZK_REGION_OPENING (3), // RS is in process of opening a region
|
RS_ZK_REGION_OPENING (3), // RS is in process of opening a region
|
||||||
RS_ZK_REGION_OPENED (4), // RS has finished opening a region
|
RS_ZK_REGION_OPENED (4), // RS has finished opening a region
|
||||||
|
RS_ZK_REGION_SPLITTING (5), // RS has started a region split
|
||||||
|
RS_ZK_REGION_SPLIT (6), // RS split has completed.
|
||||||
|
|
||||||
// Messages originating from Master to RS
|
// Messages originating from Master to RS
|
||||||
M_RS_OPEN_REGION (20), // Master asking RS to open a region
|
M_RS_OPEN_REGION (20), // Master asking RS to open a region
|
||||||
|
|
|
@ -112,6 +112,7 @@ public class ExecutorService {
|
||||||
case RS_ZK_REGION_OPENED:
|
case RS_ZK_REGION_OPENED:
|
||||||
return ExecutorType.MASTER_OPEN_REGION;
|
return ExecutorType.MASTER_OPEN_REGION;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_SPLIT:
|
||||||
case M_SERVER_SHUTDOWN:
|
case M_SERVER_SHUTDOWN:
|
||||||
return ExecutorType.MASTER_SERVER_OPERATIONS;
|
return ExecutorType.MASTER_SERVER_OPERATIONS;
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,8 @@ public class RegionTransitionData implements Writable {
|
||||||
/** Time the event was created. Required but automatically set. */
|
/** Time the event was created. Required but automatically set. */
|
||||||
private long stamp;
|
private long stamp;
|
||||||
|
|
||||||
|
private byte [] payload;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writable constructor. Do not use directly.
|
* Writable constructor. Do not use directly.
|
||||||
*/
|
*/
|
||||||
|
@ -82,6 +84,7 @@ public class RegionTransitionData implements Writable {
|
||||||
*
|
*
|
||||||
* <p>Valid types for this constructor are {@link EventType#RS_ZK_REGION_CLOSING},
|
* <p>Valid types for this constructor are {@link EventType#RS_ZK_REGION_CLOSING},
|
||||||
* {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING},
|
* {@link EventType#RS_ZK_REGION_CLOSED}, {@link EventType#RS_ZK_REGION_OPENING},
|
||||||
|
* {@link EventType#RS_ZK_REGION_SPLITTING},
|
||||||
* and {@link EventType#RS_ZK_REGION_OPENED}.
|
* and {@link EventType#RS_ZK_REGION_OPENED}.
|
||||||
*
|
*
|
||||||
* @param eventType type of event
|
* @param eventType type of event
|
||||||
|
@ -90,10 +93,31 @@ public class RegionTransitionData implements Writable {
|
||||||
*/
|
*/
|
||||||
public RegionTransitionData(EventType eventType, byte [] regionName,
|
public RegionTransitionData(EventType eventType, byte [] regionName,
|
||||||
String serverName) {
|
String serverName) {
|
||||||
|
this(eventType, regionName, serverName, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct data for a new region transition event with the specified event
|
||||||
|
* type, region name, and server name.
|
||||||
|
*
|
||||||
|
* <p>Used when the server name is known (a regionserver is setting it).
|
||||||
|
*
|
||||||
|
* <p>Valid types for this constructor are {@link EventType#RS_ZK_REGION_SPLIT}
|
||||||
|
* since SPLIT is only type that currently carries a payload.
|
||||||
|
*
|
||||||
|
* @param eventType type of event
|
||||||
|
* @param regionName name of region as per <code>HRegionInfo#getRegionName()</code>
|
||||||
|
* @param serverName name of server setting data
|
||||||
|
* @param payload Payload examples include the daughters involved in a
|
||||||
|
* {@link EventType#RS_ZK_REGION_SPLIT}. Can be null
|
||||||
|
*/
|
||||||
|
public RegionTransitionData(EventType eventType, byte [] regionName,
|
||||||
|
String serverName, final byte [] payload) {
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.stamp = System.currentTimeMillis();
|
this.stamp = System.currentTimeMillis();
|
||||||
this.regionName = regionName;
|
this.regionName = regionName;
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
|
this.payload = payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,6 +130,8 @@ public class RegionTransitionData implements Writable {
|
||||||
* <li>{@link EventType#RS_ZK_REGION_CLOSED}
|
* <li>{@link EventType#RS_ZK_REGION_CLOSED}
|
||||||
* <li>{@link EventType#RS_ZK_REGION_OPENING}
|
* <li>{@link EventType#RS_ZK_REGION_OPENING}
|
||||||
* <li>{@link EventType#RS_ZK_REGION_OPENED}
|
* <li>{@link EventType#RS_ZK_REGION_OPENED}
|
||||||
|
* <li>{@link EventType#RS_ZK_REGION_SPLITTING}
|
||||||
|
* <li>{@link EventType#RS_ZK_REGION_SPLIT}
|
||||||
* </ul>
|
* </ul>
|
||||||
* @return type of region transition event
|
* @return type of region transition event
|
||||||
*/
|
*/
|
||||||
|
@ -142,6 +168,13 @@ public class RegionTransitionData implements Writable {
|
||||||
return stamp;
|
return stamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Payload if any.
|
||||||
|
*/
|
||||||
|
public byte [] getPayload() {
|
||||||
|
return this.payload;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
// the event type byte
|
// the event type byte
|
||||||
|
@ -157,6 +190,9 @@ public class RegionTransitionData implements Writable {
|
||||||
} else {
|
} else {
|
||||||
serverName = null;
|
serverName = null;
|
||||||
}
|
}
|
||||||
|
if (in.readBoolean()) {
|
||||||
|
this.payload = Bytes.readByteArray(in);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -169,6 +205,10 @@ public class RegionTransitionData implements Writable {
|
||||||
if(serverName != null) {
|
if(serverName != null) {
|
||||||
out.writeUTF(serverName);
|
out.writeUTF(serverName);
|
||||||
}
|
}
|
||||||
|
out.writeBoolean(this.payload != null);
|
||||||
|
if (this.payload != null) {
|
||||||
|
Bytes.writeByteArray(out, this.payload);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -176,7 +176,8 @@ public interface HMasterInterface extends VersionedProtocol {
|
||||||
* @param regionName Region to unassign. Will clear any existing RegionPlan
|
* @param regionName Region to unassign. Will clear any existing RegionPlan
|
||||||
* if one found.
|
* if one found.
|
||||||
* @param force If true, force unassign (Will remove region from
|
* @param force If true, force unassign (Will remove region from
|
||||||
* regions-in-transition too if present).
|
* regions-in-transition too if present as well as from assigned regions --
|
||||||
|
* radical!).
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void unassign(final byte [] regionName, final boolean force)
|
public void unassign(final byte [] regionName, final boolean force)
|
||||||
|
|
|
@ -53,22 +53,24 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
|
||||||
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
|
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
|
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
|
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
||||||
|
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
|
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.zookeeper.AsyncCallback;
|
import org.apache.zookeeper.AsyncCallback;
|
||||||
|
@ -365,6 +367,43 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// Nothing to do.
|
// Nothing to do.
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_SPLITTING:
|
||||||
|
if (!isInStateForSplitting(regionState)) break;
|
||||||
|
addSplittingToRIT(data.getServerName(), encodedName);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_SPLIT:
|
||||||
|
// RegionState must be null, or SPLITTING or PENDING_CLOSE.
|
||||||
|
if (!isInStateForSplitting(regionState)) break;
|
||||||
|
// If null, add SPLITTING state before going to SPLIT
|
||||||
|
if (regionState == null) {
|
||||||
|
LOG.info("Received SPLIT for region " + prettyPrintedRegionName +
|
||||||
|
" from server " + data.getServerName() +
|
||||||
|
" but region was not first in SPLITTING state; continuing");
|
||||||
|
addSplittingToRIT(data.getServerName(), encodedName);
|
||||||
|
}
|
||||||
|
// Check it has daughters.
|
||||||
|
byte [] payload = data.getPayload();
|
||||||
|
List<HRegionInfo> daughters = null;
|
||||||
|
try {
|
||||||
|
daughters = Writables.getHRegionInfos(payload, 0, payload.length);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Dropped split! Failed reading split payload for " +
|
||||||
|
prettyPrintedRegionName);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert daughters.size() == 2;
|
||||||
|
// Assert that we can get a serverinfo for this server.
|
||||||
|
HServerInfo hsi = getAndCheckHServerInfo(data.getServerName());
|
||||||
|
if (hsi == null) {
|
||||||
|
LOG.error("Dropped split! No HServerInfo for " + data.getServerName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Run handler to do the rest of the SPLIT handling.
|
||||||
|
this.executorService.submit(new SplitRegionHandler(master, this,
|
||||||
|
regionState.getRegion(), hsi, daughters));
|
||||||
|
break;
|
||||||
|
|
||||||
case RS_ZK_REGION_CLOSING:
|
case RS_ZK_REGION_CLOSING:
|
||||||
// Should see CLOSING after we have asked it to CLOSE or additional
|
// Should see CLOSING after we have asked it to CLOSE or additional
|
||||||
// times after already being in state of CLOSING
|
// times after already being in state of CLOSING
|
||||||
|
@ -416,7 +455,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
case RS_ZK_REGION_OPENED:
|
case RS_ZK_REGION_OPENED:
|
||||||
// Should see OPENED after OPENING but possible after PENDING_OPEN
|
// Should see OPENED after OPENING but possible after PENDING_OPEN
|
||||||
if(regionState == null ||
|
if (regionState == null ||
|
||||||
(!regionState.isPendingOpen() && !regionState.isOpening())) {
|
(!regionState.isPendingOpen() && !regionState.isOpening())) {
|
||||||
LOG.warn("Received OPENED for region " +
|
LOG.warn("Received OPENED for region " +
|
||||||
prettyPrintedRegionName +
|
prettyPrintedRegionName +
|
||||||
|
@ -435,6 +474,99 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns true if this RegionState is splittable; i.e. the
|
||||||
|
* RegionState is currently in splitting state or pending_close or
|
||||||
|
* null (Anything else will return false). (Anything else will return false).
|
||||||
|
*/
|
||||||
|
private boolean isInStateForSplitting(final RegionState rs) {
|
||||||
|
if (rs == null) return true;
|
||||||
|
if (rs.isSplitting()) return true;
|
||||||
|
if (convertPendingCloseToSplitting(rs)) return true;
|
||||||
|
LOG.warn("Dropped region split! Not in state good for SPLITTING; rs=" + rs);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the passed regionState is in PENDING_CLOSE, clean up PENDING_CLOSE
|
||||||
|
* state and convert it to SPLITTING instead.
|
||||||
|
* This can happen in case where master wants to close a region at same time
|
||||||
|
* a regionserver starts a split. The split won. Clean out old PENDING_CLOSE
|
||||||
|
* state.
|
||||||
|
* @param rs
|
||||||
|
* @return True if we converted from PENDING_CLOSE to SPLITTING
|
||||||
|
*/
|
||||||
|
private boolean convertPendingCloseToSplitting(final RegionState rs) {
|
||||||
|
if (!rs.isPendingClose()) return false;
|
||||||
|
LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
|
||||||
|
rs.update(RegionState.State.SPLITTING);
|
||||||
|
// Clean up existing state. Clear from region plans seems all we
|
||||||
|
// have to do here by way of clean up of PENDING_CLOSE.
|
||||||
|
clearRegionPlan(rs.getRegion());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HServerInfo getAndCheckHServerInfo(final String serverName) {
|
||||||
|
HServerInfo hsi = this.serverManager.getServerInfo(serverName);
|
||||||
|
if (hsi == null) LOG.debug("No serverinfo for " + serverName);
|
||||||
|
return hsi;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param serverName
|
||||||
|
* @param encodedName
|
||||||
|
* @return The SPLITTING RegionState we added to RIT for the passed region
|
||||||
|
* <code>encodedName</code>
|
||||||
|
*/
|
||||||
|
private RegionState addSplittingToRIT(final String serverName,
|
||||||
|
final String encodedName) {
|
||||||
|
RegionState regionState = null;
|
||||||
|
synchronized (this.regions) {
|
||||||
|
regionState = findHRegionInfoThenAddToRIT(serverName, encodedName);
|
||||||
|
regionState.update(RegionState.State.SPLITTING);
|
||||||
|
}
|
||||||
|
return regionState;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caller must hold lock on <code>this.regions</code>.
|
||||||
|
* @param serverName
|
||||||
|
* @param encodedName
|
||||||
|
* @return The instance of RegionState that was added to RIT or null if error.
|
||||||
|
*/
|
||||||
|
private RegionState findHRegionInfoThenAddToRIT(final String serverName,
|
||||||
|
final String encodedName) {
|
||||||
|
HRegionInfo hri = findHRegionInfo(serverName, encodedName);
|
||||||
|
if (hri == null) {
|
||||||
|
LOG.warn("Region " + encodedName + " not found on server " + serverName +
|
||||||
|
"; failed processing");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// Add to regions in transition, then update state to SPLITTING.
|
||||||
|
return addToRegionsInTransition(hri);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caller must hold lock on <code>this.regions</code>.
|
||||||
|
* @param serverName
|
||||||
|
* @param encodedName
|
||||||
|
* @return Found HRegionInfo or null.
|
||||||
|
*/
|
||||||
|
private HRegionInfo findHRegionInfo(final String serverName,
|
||||||
|
final String encodedName) {
|
||||||
|
HServerInfo hsi = getAndCheckHServerInfo(serverName);
|
||||||
|
if (hsi == null) return null;
|
||||||
|
List<HRegionInfo> hris = this.servers.get(hsi);
|
||||||
|
HRegionInfo foundHri = null;
|
||||||
|
for (HRegionInfo hri: hris) {
|
||||||
|
if (hri.getEncodedName().equals(encodedName)) {
|
||||||
|
foundHri = hri;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return foundHri;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a ZK unassigned node transition triggered by HBCK repair tool.
|
* Handle a ZK unassigned node transition triggered by HBCK repair tool.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -523,7 +655,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
synchronized(regionsInTransition) {
|
synchronized(regionsInTransition) {
|
||||||
try {
|
try {
|
||||||
RegionTransitionData data = ZKAssign.getData(watcher, path);
|
RegionTransitionData data = ZKAssign.getData(watcher, path);
|
||||||
if(data == null) {
|
if (data == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
handleRegion(data);
|
handleRegion(data);
|
||||||
|
@ -534,11 +666,31 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void nodeDeleted(String path) {
|
||||||
|
// Added so we notice when ephemeral nodes go away; in particular,
|
||||||
|
// SPLITTING or SPLIT nodes added by a regionserver splitting.
|
||||||
|
if (path.startsWith(this.watcher.assignmentZNode)) {
|
||||||
|
String regionName =
|
||||||
|
ZKAssign.getRegionName(this.master.getZooKeeper(), path);
|
||||||
|
RegionState rs = this.regionsInTransition.get(regionName);
|
||||||
|
if (rs != null) {
|
||||||
|
if (rs.isSplitting() || rs.isSplit()) {
|
||||||
|
LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
|
||||||
|
"clearing from RIT; rs=" + rs);
|
||||||
|
clearRegionFromTransition(rs.getRegion());
|
||||||
|
} else {
|
||||||
|
LOG.warn("Node deleted but still in RIT: " + rs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* New unassigned node has been created.
|
* New unassigned node has been created.
|
||||||
*
|
*
|
||||||
* <p>This happens when an RS begins the OPENING or CLOSING of a region by
|
* <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
|
||||||
* creating an unassigned node.
|
* region by creating a znode.
|
||||||
*
|
*
|
||||||
* <p>When this happens we must:
|
* <p>When this happens we must:
|
||||||
* <ol>
|
* <ol>
|
||||||
|
@ -849,7 +1001,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param region
|
* @param region
|
||||||
* @return
|
* @return The current RegionState
|
||||||
*/
|
*/
|
||||||
private RegionState addToRegionsInTransition(final HRegionInfo region) {
|
private RegionState addToRegionsInTransition(final HRegionInfo region) {
|
||||||
synchronized (regionsInTransition) {
|
synchronized (regionsInTransition) {
|
||||||
|
@ -1464,9 +1616,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears the specified region from being in transition.
|
* Clears the specified region from being in transition.
|
||||||
* <p>
|
* @param hri Region to remove.
|
||||||
* Used only by HBCK tool.
|
|
||||||
* @param hri
|
|
||||||
*/
|
*/
|
||||||
public void clearRegionFromTransition(HRegionInfo hri) {
|
public void clearRegionFromTransition(HRegionInfo hri) {
|
||||||
synchronized (this.regionsInTransition) {
|
synchronized (this.regionsInTransition) {
|
||||||
|
@ -1747,24 +1897,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
|
public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
|
||||||
final HRegionInfo a, final HRegionInfo b) {
|
final HRegionInfo a, final HRegionInfo b) {
|
||||||
regionOffline(parent);
|
regionOffline(parent);
|
||||||
// Remove any CLOSING node, if exists, due to race between master & rs
|
|
||||||
// for close & split. Not putting into regionOffline method because it is
|
|
||||||
// called from various locations.
|
|
||||||
try {
|
|
||||||
RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher,
|
|
||||||
parent.getEncodedName(), null);
|
|
||||||
if (node != null) {
|
|
||||||
if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) {
|
|
||||||
ZKAssign.deleteClosingNode(this.watcher, parent);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Split report has RIT node (shouldnt have one): " +
|
|
||||||
parent + " node: " + node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
LOG.warn("Exception while validating RIT during split report", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
regionOnline(a, hsi);
|
regionOnline(a, hsi);
|
||||||
regionOnline(b, hsi);
|
regionOnline(b, hsi);
|
||||||
|
|
||||||
|
@ -1844,7 +1976,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
OPEN, // server opened region and updated meta
|
OPEN, // server opened region and updated meta
|
||||||
PENDING_CLOSE, // sent rpc to server to close but has not begun
|
PENDING_CLOSE, // sent rpc to server to close but has not begun
|
||||||
CLOSING, // server has begun to close but not yet done
|
CLOSING, // server has begun to close but not yet done
|
||||||
CLOSED // server closed region and updated meta
|
CLOSED, // server closed region and updated meta
|
||||||
|
SPLITTING, // server started split of a region
|
||||||
|
SPLIT // server completed split of a region
|
||||||
}
|
}
|
||||||
|
|
||||||
private State state;
|
private State state;
|
||||||
|
@ -1912,6 +2046,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
return state == State.OFFLINE;
|
return state == State.OFFLINE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSplitting() {
|
||||||
|
return state == State.SPLITTING;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSplit() {
|
||||||
|
return state == State.SPLIT;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return region.getRegionNameAsString() + " state=" + state +
|
return region.getRegionNameAsString() + " state=" + state +
|
||||||
|
|
|
@ -277,11 +277,6 @@ public class ServerManager {
|
||||||
for (HMsg msg: msgs) {
|
for (HMsg msg: msgs) {
|
||||||
LOG.info("Received " + msg + " from " + serverInfo.getServerName());
|
LOG.info("Received " + msg + " from " + serverInfo.getServerName());
|
||||||
switch (msg.getType()) {
|
switch (msg.getType()) {
|
||||||
case REGION_SPLIT:
|
|
||||||
this.services.getAssignmentManager().handleSplitReport(serverInfo,
|
|
||||||
msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB());
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
LOG.error("Unhandled msg type " + msg);
|
LOG.error("Unhandled msg type " + msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,18 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
// Wait on meta to come online; we need it to progress.
|
// Wait on meta to come online; we need it to progress.
|
||||||
// TODO: Best way to hold strictly here? We should build this retry logic
|
// TODO: Best way to hold strictly here? We should build this retry logic
|
||||||
// into the MetaReader operations themselves.
|
// into the MetaReader operations themselves.
|
||||||
|
// TODO: Is the reading of .META. necessary when the Master has state of
|
||||||
|
// cluster in its head? It should be possible to do without reading .META.
|
||||||
|
// in all but one case. On split, the RS updates the .META.
|
||||||
|
// table and THEN informs the master of the split via zk nodes in
|
||||||
|
// 'unassigned' dir. Currently the RS puts ephemeral nodes into zk so if
|
||||||
|
// the regionserver dies, these nodes do not stick around and this server
|
||||||
|
// shutdown processing does fixup (see the fixupDaughters method below).
|
||||||
|
// If we wanted to skip the .META. scan, we'd have to change at least the
|
||||||
|
// final SPLIT message to be permanent in zk so in here we'd know a SPLIT
|
||||||
|
// completed (zk is updated after edits to .META. have gone in). See
|
||||||
|
// {@link SplitTransaction}. We'd also have to be figure another way for
|
||||||
|
// doing the below .META. daughters fixup.
|
||||||
NavigableMap<HRegionInfo, Result> hris = null;
|
NavigableMap<HRegionInfo, Result> hris = null;
|
||||||
while (!this.server.isStopped()) {
|
while (!this.server.isStopped()) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -154,27 +154,22 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
if (!st.prepare()) return;
|
if (!st.prepare()) return;
|
||||||
try {
|
try {
|
||||||
st.execute(this.server, this.server);
|
st.execute(this.server, this.server);
|
||||||
} catch (IOException ioe) {
|
} catch (Exception e) {
|
||||||
try {
|
try {
|
||||||
LOG.info("Running rollback of failed split of " +
|
LOG.info("Running rollback of failed split of " +
|
||||||
parent.getRegionNameAsString() + "; " + ioe.getMessage());
|
parent.getRegionNameAsString() + "; " + e.getMessage());
|
||||||
st.rollback(this.server);
|
st.rollback(this.server, this.server);
|
||||||
LOG.info("Successful rollback of failed split of " +
|
LOG.info("Successful rollback of failed split of " +
|
||||||
parent.getRegionNameAsString());
|
parent.getRegionNameAsString());
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException ee) {
|
||||||
// If failed rollback, kill this server to avoid having a hole in table.
|
// If failed rollback, kill this server to avoid having a hole in table.
|
||||||
LOG.info("Failed rollback of failed split of " +
|
LOG.info("Failed rollback of failed split of " +
|
||||||
parent.getRegionNameAsString() + " -- aborting server", e);
|
parent.getRegionNameAsString() + " -- aborting server", ee);
|
||||||
this.server.abort("Failed split");
|
this.server.abort("Failed split");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now tell the master about the new regions. If we fail here, its OK.
|
|
||||||
// Basescanner will do fix up. And reporting split to master is going away.
|
|
||||||
// TODO: Verify this still holds in new master rewrite.
|
|
||||||
this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(),
|
|
||||||
st.getSecondDaughter());
|
|
||||||
LOG.info("Region split, META updated, and report to master. Parent=" +
|
LOG.info("Region split, META updated, and report to master. Parent=" +
|
||||||
parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
|
parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
|
||||||
st.getFirstDaughter().getRegionNameAsString() + ", " +
|
st.getFirstDaughter().getRegionNameAsString() + ", " +
|
||||||
|
|
|
@ -1534,25 +1534,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Add to the outbound message buffer
|
|
||||||
*
|
|
||||||
* When a region splits, we need to tell the master that there are two new
|
|
||||||
* regions that need to be assigned.
|
|
||||||
*
|
|
||||||
* We do not need to inform the master about the old region, because we've
|
|
||||||
* updated the meta or root regions, and the master will pick that up on its
|
|
||||||
* next rescan of the root or meta tables.
|
|
||||||
*/
|
|
||||||
void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
|
|
||||||
HRegionInfo newRegionB) {
|
|
||||||
this.outboundMsgs.add(new HMsg(
|
|
||||||
HMsg.Type.REGION_SPLIT, oldRegion, newRegionA,
|
|
||||||
newRegionB, Bytes.toBytes("Daughters; "
|
|
||||||
+ newRegionA.getRegionNameAsString() + ", "
|
|
||||||
+ newRegionB.getRegionNameAsString())));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes all regions. Called on our way out.
|
* Closes all regions. Called on our way out.
|
||||||
* Assumes that its not possible for new regions to be added to onlineRegions
|
* Assumes that its not possible for new regions to be added to onlineRegions
|
||||||
|
|
|
@ -40,13 +40,20 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||||
|
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -85,6 +92,7 @@ public class SplitTransaction {
|
||||||
private HRegionInfo hri_b;
|
private HRegionInfo hri_b;
|
||||||
private Path splitdir;
|
private Path splitdir;
|
||||||
private long fileSplitTimeout = 30000;
|
private long fileSplitTimeout = 30000;
|
||||||
|
private int znodeVersion = -1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Row to split around
|
* Row to split around
|
||||||
|
@ -95,6 +103,10 @@ public class SplitTransaction {
|
||||||
* Types to add to the transaction journal
|
* Types to add to the transaction journal
|
||||||
*/
|
*/
|
||||||
enum JournalEntry {
|
enum JournalEntry {
|
||||||
|
/**
|
||||||
|
* Set region as in transition, set it into SPLITTING state.
|
||||||
|
*/
|
||||||
|
SET_SPLITTING_IN_ZK,
|
||||||
/**
|
/**
|
||||||
* We created the temporary split data directory.
|
* We created the temporary split data directory.
|
||||||
*/
|
*/
|
||||||
|
@ -185,6 +197,8 @@ public class SplitTransaction {
|
||||||
* @param services Used to online/offline regions.
|
* @param services Used to online/offline regions.
|
||||||
* @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
|
* @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
|
||||||
* @return Regions created
|
* @return Regions created
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws NodeExistsException
|
||||||
* @see #rollback(OnlineRegions)
|
* @see #rollback(OnlineRegions)
|
||||||
*/
|
*/
|
||||||
public PairOfSameType<HRegion> execute(final Server server,
|
public PairOfSameType<HRegion> execute(final Server server,
|
||||||
|
@ -195,19 +209,32 @@ public class SplitTransaction {
|
||||||
(services != null && services.isStopping())) {
|
(services != null && services.isStopping())) {
|
||||||
throw new IOException("Server is stopped or stopping");
|
throw new IOException("Server is stopped or stopping");
|
||||||
}
|
}
|
||||||
assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs";
|
assert !this.parent.lock.writeLock().isHeldByCurrentThread(): "Unsafe to hold write lock while performing RPCs";
|
||||||
|
|
||||||
// Coprocessor callback
|
// Coprocessor callback
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
if (this.parent.getCoprocessorHost() != null) {
|
||||||
this.parent.getCoprocessorHost().preSplit();
|
this.parent.getCoprocessorHost().preSplit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If true, no cluster to write meta edits into.
|
// If true, no cluster to write meta edits to or to update znodes in.
|
||||||
boolean testing = server == null? true:
|
boolean testing = server == null? true:
|
||||||
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
|
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
|
||||||
this.fileSplitTimeout = testing ? this.fileSplitTimeout :
|
this.fileSplitTimeout = testing ? this.fileSplitTimeout :
|
||||||
server.getConfiguration().getLong(
|
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
|
||||||
"hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout);
|
this.fileSplitTimeout);
|
||||||
|
|
||||||
|
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
|
||||||
|
// have zookeeper so don't do zk stuff if zookeeper is null
|
||||||
|
if (server != null && server.getZooKeeper() != null) {
|
||||||
|
try {
|
||||||
|
this.znodeVersion = createNodeSplitting(server.getZooKeeper(),
|
||||||
|
this.parent.getRegionInfo(), server.getServerName());
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Failed setting SPLITTING znode on " +
|
||||||
|
this.parent.getRegionNameAsString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
|
||||||
|
|
||||||
createSplitDir(this.parent.getFilesystem(), this.splitdir);
|
createSplitDir(this.parent.getFilesystem(), this.splitdir);
|
||||||
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
|
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
|
||||||
|
@ -254,14 +281,9 @@ public class SplitTransaction {
|
||||||
this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
|
this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
// The is the point of no return. We are committed to the split now. We
|
// This is the point of no return. We are committed to the split now. We
|
||||||
// have still the daughter regions to open but meta has been changed.
|
// have still the daughter regions to open but meta has been changed.
|
||||||
// If we fail from here on out, we can not rollback so, we'll just abort.
|
// If we fail from here on out, we cannot rollback so, we'll just abort.
|
||||||
// The meta has been changed though so there will need to be a fixup run
|
|
||||||
// during processing of the crashed server by master (TODO: Verify this in place).
|
|
||||||
|
|
||||||
// TODO: Could we be smarter about the sequence in which we do these steps?
|
|
||||||
|
|
||||||
if (!testing) {
|
if (!testing) {
|
||||||
// Open daughters in parallel.
|
// Open daughters in parallel.
|
||||||
DaughterOpener aOpener = new DaughterOpener(server, services, a);
|
DaughterOpener aOpener = new DaughterOpener(server, services, a);
|
||||||
|
@ -276,6 +298,17 @@ public class SplitTransaction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tell master about split by updating zk. If we fail, abort.
|
||||||
|
if (server != null && server.getZooKeeper() != null) {
|
||||||
|
try {
|
||||||
|
transitionNodeSplit(server.getZooKeeper(), parent.getRegionInfo(),
|
||||||
|
a.getRegionInfo(), b.getRegionInfo(), server.getServerName(),
|
||||||
|
this.znodeVersion);
|
||||||
|
} catch (Exception e) {
|
||||||
|
server.abort("Failed telling master about split", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Coprocessor callback
|
// Coprocessor callback
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
if (this.parent.getCoprocessorHost() != null) {
|
||||||
this.parent.getCoprocessorHost().postSplit(a,b);
|
this.parent.getCoprocessorHost().postSplit(a,b);
|
||||||
|
@ -543,13 +576,20 @@ public class SplitTransaction {
|
||||||
* @return The region we were splitting
|
* @return The region we were splitting
|
||||||
* @throws IOException If thrown, rollback failed. Take drastic action.
|
* @throws IOException If thrown, rollback failed. Take drastic action.
|
||||||
*/
|
*/
|
||||||
public void rollback(final OnlineRegions or) throws IOException {
|
public void rollback(final Server server, final OnlineRegions or)
|
||||||
|
throws IOException {
|
||||||
FileSystem fs = this.parent.getFilesystem();
|
FileSystem fs = this.parent.getFilesystem();
|
||||||
ListIterator<JournalEntry> iterator =
|
ListIterator<JournalEntry> iterator =
|
||||||
this.journal.listIterator(this.journal.size());
|
this.journal.listIterator(this.journal.size());
|
||||||
while (iterator.hasPrevious()) {
|
while (iterator.hasPrevious()) {
|
||||||
JournalEntry je = iterator.previous();
|
JournalEntry je = iterator.previous();
|
||||||
switch(je) {
|
switch(je) {
|
||||||
|
case SET_SPLITTING_IN_ZK:
|
||||||
|
if (server != null && server.getZooKeeper() != null) {
|
||||||
|
cleanZK(server, this.parent.getRegionInfo());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case CREATE_SPLIT_DIR:
|
case CREATE_SPLIT_DIR:
|
||||||
cleanupSplitDir(fs, this.splitdir);
|
cleanupSplitDir(fs, this.splitdir);
|
||||||
break;
|
break;
|
||||||
|
@ -623,4 +663,99 @@ public class SplitTransaction {
|
||||||
cleanupSplitDir(r.getFilesystem(), splitdir);
|
cleanupSplitDir(r.getFilesystem(), splitdir);
|
||||||
LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
|
LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void cleanZK(final Server server, final HRegionInfo hri) {
|
||||||
|
try {
|
||||||
|
// Only delete if its in expected state; could have been hijacked.
|
||||||
|
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||||
|
EventType.RS_ZK_REGION_SPLITTING);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new ephemeral node in the SPLITTING state for the specified region.
|
||||||
|
* Create it ephemeral in case regionserver dies mid-split.
|
||||||
|
*
|
||||||
|
* <p>Does not transition nodes from other states. If a node already exists
|
||||||
|
* for this region, a {@link NodeExistsException} will be thrown.
|
||||||
|
*
|
||||||
|
* @param zkw zk reference
|
||||||
|
* @param region region to be created as offline
|
||||||
|
* @param serverName server event originates from
|
||||||
|
* @return Version of znode created.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static int createNodeSplitting(final ZooKeeperWatcher zkw,
|
||||||
|
final HRegionInfo region, final String serverName)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
LOG.debug(zkw.prefix("Creating ephemeral node for " +
|
||||||
|
region.getEncodedName() + " in SPLITTING state"));
|
||||||
|
RegionTransitionData data =
|
||||||
|
new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING,
|
||||||
|
region.getRegionName(), serverName);
|
||||||
|
// This synchronization is copied from ZKAssign.
|
||||||
|
synchronized(zkw.getNodes()) {
|
||||||
|
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
|
||||||
|
zkw.getNodes().add(node);
|
||||||
|
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) {
|
||||||
|
throw new IOException("Failed create of ephemeral " + node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Transition node from SPLITTING to SPLITTING and pick up version so we
|
||||||
|
// can be sure this znode is ours; version is needed deleting.
|
||||||
|
return transitionNodeSplitting(zkw, region, serverName, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transitions an existing node for the specified region which is
|
||||||
|
* currently in the SPLITTING state to be in the SPLIT state. Converts the
|
||||||
|
* ephemeral SPLITTING znode to an ephemeral SPLIT node. Master cleans up
|
||||||
|
* SPLIT znode when it reads it (or if we crash, zk will clean it up).
|
||||||
|
*
|
||||||
|
* <p>Does not transition nodes from other states. If for some reason the
|
||||||
|
* node could not be transitioned, the method returns -1. If the transition
|
||||||
|
* is successful, the version of the node after transition is returned.
|
||||||
|
*
|
||||||
|
* <p>This method can fail and return false for three different reasons:
|
||||||
|
* <ul><li>Node for this region does not exist</li>
|
||||||
|
* <li>Node for this region is not in SPLITTING state</li>
|
||||||
|
* <li>After verifying SPLITTING state, update fails because of wrong version
|
||||||
|
* (this should never actually happen since an RS only does this transition
|
||||||
|
* following a transition to SPLITTING. if two RS are conflicting, one would
|
||||||
|
* fail the original transition to SPLITTING and not this transition)</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>Does not set any watches.
|
||||||
|
*
|
||||||
|
* <p>This method should only be used by a RegionServer when completing the
|
||||||
|
* open of a region.
|
||||||
|
*
|
||||||
|
* @param zkw zk reference
|
||||||
|
* @param parent region to be transitioned to opened
|
||||||
|
* @param a Daughter a of split
|
||||||
|
* @param b Daughter b of split
|
||||||
|
* @param serverName server event originates from
|
||||||
|
* @return version of node after transition, -1 if unsuccessful transition
|
||||||
|
* @throws KeeperException if unexpected zookeeper exception
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static int transitionNodeSplit(ZooKeeperWatcher zkw,
|
||||||
|
HRegionInfo parent, HRegionInfo a, HRegionInfo b, String serverName,
|
||||||
|
final int znodeVersion)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
byte [] payload = Writables.getBytes(a, b);
|
||||||
|
return ZKAssign.transitionNode(zkw, parent, serverName,
|
||||||
|
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLIT,
|
||||||
|
znodeVersion, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int transitionNodeSplitting(final ZooKeeperWatcher zkw,
|
||||||
|
final HRegionInfo parent,
|
||||||
|
final String serverName, final int version)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
return ZKAssign.transitionNode(zkw, parent, serverName,
|
||||||
|
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -28,6 +28,8 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class with methods for manipulating Writable objects
|
* Utility class with methods for manipulating Writable objects
|
||||||
|
@ -58,6 +60,31 @@ public class Writables {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Put a bunch of Writables as bytes all into the one byte array.
|
||||||
|
* @param w writable
|
||||||
|
* @return The bytes of <code>w</code> gotten by running its
|
||||||
|
* {@link Writable#write(java.io.DataOutput)} method.
|
||||||
|
* @throws IOException e
|
||||||
|
* @see #getHRegionInfos(byte[], int, int)
|
||||||
|
*/
|
||||||
|
public static byte [] getBytes(final Writable... ws) throws IOException {
|
||||||
|
List<byte []> bytes = new ArrayList<byte []>();
|
||||||
|
int size = 0;
|
||||||
|
for (Writable w: ws) {
|
||||||
|
byte [] b = getBytes(w);
|
||||||
|
size += b.length;
|
||||||
|
bytes.add(b);
|
||||||
|
}
|
||||||
|
byte [] result = new byte[size];
|
||||||
|
int offset = 0;
|
||||||
|
for (byte [] b: bytes) {
|
||||||
|
System.arraycopy(b, 0, result, offset, b.length);
|
||||||
|
offset += b.length;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set bytes into the passed Writable by calling its
|
* Set bytes into the passed Writable by calling its
|
||||||
* {@link Writable#readFields(java.io.DataInput)}.
|
* {@link Writable#readFields(java.io.DataInput)}.
|
||||||
|
@ -119,6 +146,34 @@ public class Writables {
|
||||||
return (HRegionInfo)getWritable(bytes, new HRegionInfo());
|
return (HRegionInfo)getWritable(bytes, new HRegionInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bytes serialized bytes
|
||||||
|
* @return All the hregioninfos that are in the byte array. Keeps reading
|
||||||
|
* till we hit the end.
|
||||||
|
* @throws IOException e
|
||||||
|
*/
|
||||||
|
public static List<HRegionInfo> getHRegionInfos(final byte [] bytes,
|
||||||
|
final int offset, final int length)
|
||||||
|
throws IOException {
|
||||||
|
if (bytes == null) {
|
||||||
|
throw new IllegalArgumentException("Can't build a writable with empty " +
|
||||||
|
"bytes array");
|
||||||
|
}
|
||||||
|
DataInputBuffer in = new DataInputBuffer();
|
||||||
|
List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
|
||||||
|
try {
|
||||||
|
in.reset(bytes, offset, length);
|
||||||
|
while (in.available() > 0) {
|
||||||
|
HRegionInfo hri = new HRegionInfo();
|
||||||
|
hri.readFields(in);
|
||||||
|
hris.add(hri);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return hris;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bytes serialized bytes
|
* @param bytes serialized bytes
|
||||||
* @return A HRegionInfo instance built out of passed <code>bytes</code>
|
* @return A HRegionInfo instance built out of passed <code>bytes</code>
|
||||||
|
|
|
@ -387,7 +387,7 @@ public class ZKAssign {
|
||||||
* @throws KeeperException if unexpected zookeeper exception
|
* @throws KeeperException if unexpected zookeeper exception
|
||||||
* @throws KeeperException.NoNodeException if node does not exist
|
* @throws KeeperException.NoNodeException if node does not exist
|
||||||
*/
|
*/
|
||||||
private static boolean deleteNode(ZooKeeperWatcher zkw, String regionName,
|
public static boolean deleteNode(ZooKeeperWatcher zkw, String regionName,
|
||||||
EventType expectedState)
|
EventType expectedState)
|
||||||
throws KeeperException, KeeperException.NoNodeException {
|
throws KeeperException, KeeperException.NoNodeException {
|
||||||
LOG.debug(zkw.prefix("Deleting existing unassigned " +
|
LOG.debug(zkw.prefix("Deleting existing unassigned " +
|
||||||
|
@ -412,8 +412,7 @@ public class ZKAssign {
|
||||||
if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
|
if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
|
||||||
LOG.warn(zkw.prefix("Attempting to delete " +
|
LOG.warn(zkw.prefix("Attempting to delete " +
|
||||||
"unassigned node in " + expectedState +
|
"unassigned node in " + expectedState +
|
||||||
" state but " +
|
" state but after verifying state, we got a version mismatch"));
|
||||||
"after verifying it was in OPENED state, we got a version mismatch"));
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
|
LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
|
||||||
|
@ -624,7 +623,7 @@ public class ZKAssign {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private method that actually performs unassigned node transitions.
|
* Method that actually performs unassigned node transitions.
|
||||||
*
|
*
|
||||||
* <p>Attempts to transition the unassigned node for the specified region
|
* <p>Attempts to transition the unassigned node for the specified region
|
||||||
* from the expected state to the state in the specified transition data.
|
* from the expected state to the state in the specified transition data.
|
||||||
|
@ -654,6 +653,14 @@ public class ZKAssign {
|
||||||
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
|
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
|
||||||
String serverName, EventType beginState, EventType endState,
|
String serverName, EventType beginState, EventType endState,
|
||||||
int expectedVersion)
|
int expectedVersion)
|
||||||
|
throws KeeperException {
|
||||||
|
return transitionNode(zkw, region, serverName, beginState, endState,
|
||||||
|
expectedVersion, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
|
||||||
|
String serverName, EventType beginState, EventType endState,
|
||||||
|
int expectedVersion, final byte [] payload)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
String encoded = region.getEncodedName();
|
String encoded = region.getEncodedName();
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
|
@ -694,7 +701,7 @@ public class ZKAssign {
|
||||||
// Write new data, ensuring data has not changed since we last read it
|
// Write new data, ensuring data has not changed since we last read it
|
||||||
try {
|
try {
|
||||||
RegionTransitionData data = new RegionTransitionData(endState,
|
RegionTransitionData data = new RegionTransitionData(endState,
|
||||||
region.getRegionName(), serverName);
|
region.getRegionName(), serverName, payload);
|
||||||
if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) {
|
if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) {
|
||||||
LOG.warn(zkw.prefix("Attempt to transition the " +
|
LOG.warn(zkw.prefix("Attempt to transition the " +
|
||||||
"unassigned node for " + encoded +
|
"unassigned node for " + encoded +
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.io.MapWritable;
|
import org.apache.hadoop.io.MapWritable;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -163,6 +161,7 @@ public class MiniHBaseCluster {
|
||||||
public static class MiniHBaseClusterRegionServer extends HRegionServer {
|
public static class MiniHBaseClusterRegionServer extends HRegionServer {
|
||||||
private Thread shutdownThread = null;
|
private Thread shutdownThread = null;
|
||||||
private User user = null;
|
private User user = null;
|
||||||
|
public static boolean TEST_SKIP_CLOSE = false;
|
||||||
|
|
||||||
public MiniHBaseClusterRegionServer(Configuration conf)
|
public MiniHBaseClusterRegionServer(Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
@ -170,6 +169,13 @@ public class MiniHBaseCluster {
|
||||||
this.user = User.getCurrent();
|
this.user = User.getCurrent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean closeRegion(HRegionInfo region)
|
||||||
|
throws NotServingRegionException {
|
||||||
|
if (TEST_SKIP_CLOSE) return true;
|
||||||
|
return super.closeRegion(region);
|
||||||
|
}
|
||||||
|
|
||||||
public void setHServerInfo(final HServerInfo hsi) {
|
public void setHServerInfo(final HServerInfo hsi) {
|
||||||
this.serverInfo = hsi;
|
this.serverInfo = hsi;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,9 +56,6 @@ public class TestHMsg extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSerialization() throws IOException {
|
public void testSerialization() throws IOException {
|
||||||
// Check out new HMsg that carries two daughter split regions.
|
|
||||||
byte [] abytes = Bytes.toBytes("a");
|
|
||||||
byte [] bbytes = Bytes.toBytes("b");
|
|
||||||
byte [] parentbytes = Bytes.toBytes("parent");
|
byte [] parentbytes = Bytes.toBytes("parent");
|
||||||
HRegionInfo parent =
|
HRegionInfo parent =
|
||||||
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")),
|
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")),
|
||||||
|
@ -68,15 +65,5 @@ public class TestHMsg extends TestCase {
|
||||||
byte [] bytes = Writables.getBytes(hmsg);
|
byte [] bytes = Writables.getBytes(hmsg);
|
||||||
HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg());
|
HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg());
|
||||||
assertTrue(close.equals(hmsg));
|
assertTrue(close.equals(hmsg));
|
||||||
// Assert split serializes
|
|
||||||
HRegionInfo daughtera =
|
|
||||||
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes);
|
|
||||||
HRegionInfo daughterb =
|
|
||||||
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes);
|
|
||||||
HMsg splithmsg = new HMsg(HMsg.Type.REGION_SPLIT,
|
|
||||||
parent, daughtera, daughterb, Bytes.toBytes("REGION_SPLIT"));
|
|
||||||
bytes = Writables.getBytes(splithmsg);
|
|
||||||
hmsg = (HMsg)Writables.getWritable(bytes, new HMsg());
|
|
||||||
assertTrue(splithmsg.equals(hmsg));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -113,14 +113,7 @@ public class TestSerialization {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test public void testRegionInfo() throws Exception {
|
@Test public void testRegionInfo() throws Exception {
|
||||||
final String name = "testRegionInfo";
|
HRegionInfo hri = createRandomRegion("testRegionInfo");
|
||||||
HTableDescriptor htd = new HTableDescriptor(name);
|
|
||||||
String [] families = new String [] {"info", "anchor"};
|
|
||||||
for (int i = 0; i < families.length; i++) {
|
|
||||||
htd.addFamily(new HColumnDescriptor(families[i]));
|
|
||||||
}
|
|
||||||
HRegionInfo hri = new HRegionInfo(htd,
|
|
||||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
|
||||||
byte [] hrib = Writables.getBytes(hri);
|
byte [] hrib = Writables.getBytes(hri);
|
||||||
HRegionInfo deserializedHri =
|
HRegionInfo deserializedHri =
|
||||||
(HRegionInfo)Writables.getWritable(hrib, new HRegionInfo());
|
(HRegionInfo)Writables.getWritable(hrib, new HRegionInfo());
|
||||||
|
@ -129,6 +122,29 @@ public class TestSerialization {
|
||||||
deserializedHri.getTableDesc().getFamilies().size());
|
deserializedHri.getTableDesc().getFamilies().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testRegionInfos() throws Exception {
|
||||||
|
HRegionInfo hri = createRandomRegion("testRegionInfos");
|
||||||
|
byte [] hrib = Writables.getBytes(hri);
|
||||||
|
byte [] triple = new byte [3 * hrib.length];
|
||||||
|
System.arraycopy(hrib, 0, triple, 0, hrib.length);
|
||||||
|
System.arraycopy(hrib, 0, triple, hrib.length, hrib.length);
|
||||||
|
System.arraycopy(hrib, 0, triple, hrib.length * 2, hrib.length);
|
||||||
|
List<HRegionInfo> regions = Writables.getHRegionInfos(triple, 0, triple.length);
|
||||||
|
assertTrue(regions.size() == 3);
|
||||||
|
assertTrue(regions.get(0).equals(regions.get(1)));
|
||||||
|
assertTrue(regions.get(0).equals(regions.get(2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegionInfo createRandomRegion(final String name) {
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(name);
|
||||||
|
String [] families = new String [] {"info", "anchor"};
|
||||||
|
for (int i = 0; i < families.length; i++) {
|
||||||
|
htd.addFamily(new HColumnDescriptor(families[i]));
|
||||||
|
}
|
||||||
|
return new HRegionInfo(htd, HConstants.EMPTY_START_ROW,
|
||||||
|
HConstants.EMPTY_END_ROW);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test ServerInfo serialization
|
* Test ServerInfo serialization
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
|
|
@ -271,13 +271,11 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
||||||
regions[i] = each_daughter;
|
regions[i] = each_daughter;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
} catch (IOException ioe) {
|
||||||
catch (IOException ioe) {
|
|
||||||
LOG.info("Split transaction of " + r.getRegionNameAsString() +
|
LOG.info("Split transaction of " + r.getRegionNameAsString() +
|
||||||
" failed:" + ioe.getMessage());
|
" failed:" + ioe.getMessage());
|
||||||
assertTrue(false);
|
assertTrue(false);
|
||||||
}
|
} catch (RuntimeException e) {
|
||||||
catch (RuntimeException e) {
|
|
||||||
LOG.info("Failed rollback of failed split of " +
|
LOG.info("Failed rollback of failed split of " +
|
||||||
r.getRegionNameAsString() + e.getMessage());
|
r.getRegionNameAsString() + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
|
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
|
@ -1339,7 +1338,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
try {
|
try {
|
||||||
LOG.info("Running rollback of failed split of " +
|
LOG.info("Running rollback of failed split of " +
|
||||||
parent.getRegionNameAsString() + "; " + ioe.getMessage());
|
parent.getRegionNameAsString() + "; " + ioe.getMessage());
|
||||||
st.rollback(null);
|
st.rollback(null, null);
|
||||||
LOG.info("Successful rollback of failed split of " +
|
LOG.info("Successful rollback of failed split of " +
|
||||||
parent.getRegionNameAsString());
|
parent.getRegionNameAsString());
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class TestSplitTransaction {
|
||||||
}
|
}
|
||||||
assertTrue(expectedException);
|
assertTrue(expectedException);
|
||||||
// Run rollback
|
// Run rollback
|
||||||
spiedUponSt.rollback(null);
|
spiedUponSt.rollback(null, null);
|
||||||
|
|
||||||
// Assert I can scan parent.
|
// Assert I can scan parent.
|
||||||
int parentRowCount2 = countRows(this.parent);
|
int parentRowCount2 = countRows(this.parent);
|
||||||
|
|
|
@ -38,8 +38,15 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||||
|
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||||
|
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -74,6 +81,200 @@ public class TestSplitTransactionOnCluster {
|
||||||
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
|
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HRegionInfo getAndCheckSingleTableRegion(final List<HRegion> regions) {
|
||||||
|
assertEquals(1, regions.size());
|
||||||
|
return regions.get(0).getRegionInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test what happens if master goes to balance a region just as regionserver
|
||||||
|
* goes to split it. The PENDING_CLOSE state is the strange one since its
|
||||||
|
* in the Master's head only, not out in zk. Test this case.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws NodeExistsException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test (timeout = 600000) public void testPendingCloseAndSplit()
|
||||||
|
throws IOException, InterruptedException, NodeExistsException, KeeperException {
|
||||||
|
final byte [] tableName =
|
||||||
|
Bytes.toBytes("pendingClose");
|
||||||
|
|
||||||
|
// Create table then get the single region for our new table.
|
||||||
|
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
|
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
|
|
||||||
|
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
|
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||||
|
this.admin.balanceSwitch(false);
|
||||||
|
// Turn off the meta scanner so it don't remove parent on us.
|
||||||
|
this.cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||||
|
try {
|
||||||
|
// Add a bit of load up into the table so splittable.
|
||||||
|
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
|
||||||
|
// Get region pre-split.
|
||||||
|
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
|
||||||
|
printOutRegions(server, "Initial regions: ");
|
||||||
|
int regionCount = server.getOnlineRegions().size();
|
||||||
|
// Now send in a close of a region but first make the close on the regionserver
|
||||||
|
// a NOOP. This way the master has all the state of it going to close
|
||||||
|
// but then a SPLITTING arrives. This is what we want to test.
|
||||||
|
// Here is how we turn CLOSE into NOOP in test.
|
||||||
|
MiniHBaseCluster.MiniHBaseClusterRegionServer.TEST_SKIP_CLOSE = true;
|
||||||
|
this.cluster.getMaster().unassign(hri.getRegionName(), false);
|
||||||
|
// Now try splitting and it should work.
|
||||||
|
split(hri, server, regionCount);
|
||||||
|
// Get daughters
|
||||||
|
List<HRegion> daughters = this.cluster.getRegions(tableName);
|
||||||
|
assertTrue(daughters.size() >= 2);
|
||||||
|
// Assert the ephemeral node is gone in zk.
|
||||||
|
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
|
||||||
|
hri.getEncodedName());
|
||||||
|
Stat stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
||||||
|
assertTrue(stats == null);
|
||||||
|
} finally {
|
||||||
|
// Set this flag back.
|
||||||
|
MiniHBaseCluster.MiniHBaseClusterRegionServer.TEST_SKIP_CLOSE = false;
|
||||||
|
admin.balanceSwitch(true);
|
||||||
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that intentionally has master fail the processing of the split message.
|
||||||
|
* Tests that the regionserver split ephemeral node gets cleaned up if it
|
||||||
|
* crashes and that after we process server shutdown, the daughters are up on
|
||||||
|
* line.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws NodeExistsException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test (timeout = 600000) public void testRSSplitEphemeralsDisappearButDaughtersAreOnlinedAfterShutdownHandling()
|
||||||
|
throws IOException, InterruptedException, NodeExistsException, KeeperException {
|
||||||
|
final byte [] tableName =
|
||||||
|
Bytes.toBytes("ephemeral");
|
||||||
|
|
||||||
|
// Create table then get the single region for our new table.
|
||||||
|
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
|
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
|
|
||||||
|
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
|
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||||
|
this.admin.balanceSwitch(false);
|
||||||
|
// Turn off the meta scanner so it don't remove parent on us.
|
||||||
|
cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||||
|
try {
|
||||||
|
// Add a bit of load up into the table so splittable.
|
||||||
|
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
|
||||||
|
// Get region pre-split.
|
||||||
|
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
|
||||||
|
printOutRegions(server, "Initial regions: ");
|
||||||
|
int regionCount = server.getOnlineRegions().size();
|
||||||
|
// Now, before we split, set special flag in master, a flag that has
|
||||||
|
// it FAIL the processing of split.
|
||||||
|
SplitRegionHandler.TEST_SKIP = true;
|
||||||
|
// Now try splitting and it should work.
|
||||||
|
split(hri, server, regionCount);
|
||||||
|
// Get daughters
|
||||||
|
List<HRegion> daughters = cluster.getRegions(tableName);
|
||||||
|
assertTrue(daughters.size() >= 2);
|
||||||
|
// Assert the ephemeral node is up in zk.
|
||||||
|
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
|
||||||
|
hri.getEncodedName());
|
||||||
|
Stat stats =
|
||||||
|
t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
||||||
|
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||||
|
RegionTransitionData rtd =
|
||||||
|
ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
|
||||||
|
hri.getEncodedName());
|
||||||
|
assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
|
||||||
|
// Now crash the server
|
||||||
|
cluster.abortRegionServer(tableRegionIndex);
|
||||||
|
while(server.getOnlineRegions().size() > 0) {
|
||||||
|
LOG.info("Waiting on server to go down");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
// Wait till regions are back on line again.
|
||||||
|
while(cluster.getRegions(tableName).size() < daughters.size()) {
|
||||||
|
LOG.info("Waiting for repair to happen");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
// Assert daughters are online.
|
||||||
|
regions = cluster.getRegions(tableName);
|
||||||
|
for (HRegion r: regions) {
|
||||||
|
assertTrue(daughters.contains(r));
|
||||||
|
}
|
||||||
|
// Finally assert that the ephemeral SPLIT znode was cleaned up.
|
||||||
|
stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
||||||
|
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||||
|
assertTrue(stats == null);
|
||||||
|
} finally {
|
||||||
|
// Set this flag back.
|
||||||
|
SplitRegionHandler.TEST_SKIP = false;
|
||||||
|
admin.balanceSwitch(true);
|
||||||
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 600000) public void testExistingZnodeBlocksSplitAndWeRollback()
|
||||||
|
throws IOException, InterruptedException, NodeExistsException, KeeperException {
|
||||||
|
final byte [] tableName =
|
||||||
|
Bytes.toBytes("testExistingZnodeBlocksSplitAndWeRollback");
|
||||||
|
|
||||||
|
// Create table then get the single region for our new table.
|
||||||
|
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
|
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
|
|
||||||
|
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
|
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||||
|
this.admin.balanceSwitch(false);
|
||||||
|
// Turn off the meta scanner so it don't remove parent on us.
|
||||||
|
cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||||
|
try {
|
||||||
|
// Add a bit of load up into the table so splittable.
|
||||||
|
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
|
||||||
|
// Get region pre-split.
|
||||||
|
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
|
||||||
|
printOutRegions(server, "Initial regions: ");
|
||||||
|
int regionCount = server.getOnlineRegions().size();
|
||||||
|
// Insert into zk a blocking znode, a znode of same name as region
|
||||||
|
// so it gets in way of our splitting.
|
||||||
|
ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(),
|
||||||
|
hri, "anyOldServer");
|
||||||
|
// Now try splitting.... should fail. And each should successfully
|
||||||
|
// rollback.
|
||||||
|
this.admin.split(hri.getRegionNameAsString());
|
||||||
|
this.admin.split(hri.getRegionNameAsString());
|
||||||
|
this.admin.split(hri.getRegionNameAsString());
|
||||||
|
// Wait around a while and assert count of regions remains constant.
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
assertEquals(regionCount, server.getOnlineRegions().size());
|
||||||
|
}
|
||||||
|
// Now clear the zknode
|
||||||
|
ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri);
|
||||||
|
// Now try splitting and it should work.
|
||||||
|
split(hri, server, regionCount);
|
||||||
|
// Get daughters
|
||||||
|
List<HRegion> daughters = cluster.getRegions(tableName);
|
||||||
|
assertTrue(daughters.size() >= 2);
|
||||||
|
// OK, so split happened after we cleared the blocking node.
|
||||||
|
} finally {
|
||||||
|
admin.balanceSwitch(true);
|
||||||
|
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Messy test that simulates case where SplitTransactions fails to add one
|
* Messy test that simulates case where SplitTransactions fails to add one
|
||||||
* of the daughters up into the .META. table before crash. We're testing
|
* of the daughters up into the .META. table before crash. We're testing
|
||||||
|
@ -90,8 +291,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
List<HRegion> regions = cluster.getRegions(tableName);
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
assertEquals(1, regions.size());
|
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
HRegionInfo hri = regions.get(0).getRegionInfo();
|
|
||||||
|
|
||||||
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
|
@ -151,8 +351,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
List<HRegion> regions = cluster.getRegions(tableName);
|
List<HRegion> regions = cluster.getRegions(tableName);
|
||||||
assertEquals(1, regions.size());
|
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||||
HRegionInfo hri = regions.get(0).getRegionInfo();
|
|
||||||
|
|
||||||
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||||
|
|
||||||
|
@ -205,7 +404,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
final int regionCount)
|
final int regionCount)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
this.admin.split(hri.getRegionNameAsString());
|
this.admin.split(hri.getRegionNameAsString());
|
||||||
while(server.getOnlineRegions().size() <= regionCount) {
|
while (server.getOnlineRegions().size() <= regionCount) {
|
||||||
LOG.debug("Waiting on region to split");
|
LOG.debug("Waiting on region to split");
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue