HBASE-7103 Need to fail split if SPLIT znode is deleted even before the split is completed. (Ram)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1408418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6223b09dd0
commit
b91df27d20
|
@ -106,10 +106,6 @@ public class SplitTransaction {
|
|||
* we need to rollback.
|
||||
*/
|
||||
enum JournalEntry {
|
||||
/**
|
||||
* Before creating node in splitting state.
|
||||
*/
|
||||
STARTED_SPLITTING,
|
||||
/**
|
||||
* Set region as in transition, set it into SPLITTING state.
|
||||
*/
|
||||
|
@ -240,19 +236,32 @@ public class SplitTransaction {
|
|||
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
|
||||
this.fileSplitTimeout);
|
||||
|
||||
this.journal.add(JournalEntry.STARTED_SPLITTING);
|
||||
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
|
||||
// have zookeeper so don't do zk stuff if server or zookeeper is null
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
try {
|
||||
this.znodeVersion = createNodeSplitting(server.getZooKeeper(),
|
||||
createNodeSplitting(server.getZooKeeper(),
|
||||
this.parent.getRegionInfo(), server.getServerName());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed setting SPLITTING znode on " +
|
||||
throw new IOException("Failed creating SPLITTING znode on " +
|
||||
this.parent.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
try {
|
||||
// Transition node from SPLITTING to SPLITTING after creating the split node.
|
||||
// Master will get the callback for node change only if the transition is successful.
|
||||
// Note that if the transition fails then the rollback will delete the created znode
|
||||
// as the journal entry SET_SPLITTING_IN_ZK is added.
|
||||
// TODO : May be we can add some new state to znode and handle the new state incase of success/failure
|
||||
this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(),
|
||||
this.parent.getRegionInfo(), server.getServerName(), -1);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed setting SPLITTING znode on "
|
||||
+ this.parent.getRegionNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
createSplitDir(this.parent.getFilesystem(), this.splitdir);
|
||||
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
|
||||
|
@ -750,15 +759,9 @@ public class SplitTransaction {
|
|||
JournalEntry je = iterator.previous();
|
||||
switch(je) {
|
||||
|
||||
case STARTED_SPLITTING:
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
cleanZK(server, this.parent.getRegionInfo(), false);
|
||||
}
|
||||
break;
|
||||
|
||||
case SET_SPLITTING_IN_ZK:
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
cleanZK(server, this.parent.getRegionInfo(), true);
|
||||
cleanZK(server, this.parent.getRegionInfo());
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -855,15 +858,11 @@ public class SplitTransaction {
|
|||
LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
|
||||
}
|
||||
|
||||
private static void cleanZK(final Server server, final HRegionInfo hri, boolean abort) {
|
||||
private static void cleanZK(final Server server, final HRegionInfo hri) {
|
||||
try {
|
||||
// Only delete if its in expected state; could have been hijacked.
|
||||
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
|
||||
EventType.RS_ZK_REGION_SPLITTING);
|
||||
} catch (KeeperException.NoNodeException nn) {
|
||||
if (abort) {
|
||||
server.abort("Failed cleanup of " + hri.getRegionNameAsString(), nn);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
|
||||
}
|
||||
|
|
|
@ -19,23 +19,41 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.DeserializationException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -67,6 +85,10 @@ public class TestSplitTransactionOnCluster {
|
|||
private HBaseAdmin admin = null;
|
||||
private MiniHBaseCluster cluster = null;
|
||||
private static final int NB_SERVERS = 2;
|
||||
private static CountDownLatch latch = new CountDownLatch(1);
|
||||
private static boolean secondSplit = false;
|
||||
private static boolean callRollBack = false;
|
||||
private static boolean firstSplitCompleted = false;
|
||||
|
||||
private static final HBaseTestingUtility TESTING_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
@ -538,10 +560,107 @@ public class TestSplitTransactionOnCluster {
|
|||
@Test
|
||||
public void testSplitBeforeSettingSplittingInZK() throws IOException,
|
||||
InterruptedException, KeeperException {
|
||||
testSplitBeforeSettingSplittingInZK(true);
|
||||
testSplitBeforeSettingSplittingInZK(false);
|
||||
testSplitBeforeSettingSplittingInZKInternals();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack() throws Exception {
|
||||
final byte[] tableName = Bytes
|
||||
.toBytes("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack");
|
||||
HBaseAdmin admin = new HBaseAdmin(TESTING_UTIL.getConfiguration());
|
||||
try {
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("cf"));
|
||||
admin.createTable(htd);
|
||||
HTable t = new HTable(cluster.getConfiguration(), tableName);
|
||||
while (!(cluster.getRegions(tableName).size() == 1)) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
final List<HRegion> regions = cluster.getRegions(tableName);
|
||||
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||
int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName());
|
||||
final HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
insertData(tableName, admin, t);
|
||||
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||
this.admin.setBalancerRunning(false, false);
|
||||
// Turn off the meta scanner so it don't remove parent on us.
|
||||
cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||
|
||||
new Thread() {
|
||||
public void run() {
|
||||
SplitTransaction st = null;
|
||||
st = new MockedSplitTransaction(regions.get(0), Bytes.toBytes("row2"));
|
||||
try {
|
||||
st.prepare();
|
||||
st.execute(regionServer, regionServer);
|
||||
} catch (IOException e) {
|
||||
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
while (!callRollBack) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
SplitTransaction st = null;
|
||||
st = new MockedSplitTransaction(regions.get(0), Bytes.toBytes("row2"));
|
||||
try {
|
||||
secondSplit = true;
|
||||
st.prepare();
|
||||
st.execute(regionServer, regionServer);
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Rollback started :"+ e.getMessage());
|
||||
st.rollback(regionServer, regionServer);
|
||||
}
|
||||
while (!firstSplitCompleted) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
RegionStates regionStates = cluster.getMaster().getAssignmentManager().getRegionStates();
|
||||
Map<String, RegionState> rit = regionStates.getRegionsInTransition();
|
||||
|
||||
while (rit.containsKey(hri.getTableNameAsString())) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
List<HRegion> onlineRegions = regionServer.getOnlineRegions(tableName);
|
||||
// Region server side split is successful.
|
||||
assertEquals("The parent region should be splitted", 2, onlineRegions.size());
|
||||
//Should be present in RIT
|
||||
List<HRegionInfo> regionsOfTable = cluster.getMaster().getAssignmentManager()
|
||||
.getRegionStates().getRegionsOfTable(tableName);
|
||||
// Master side should also reflect the same
|
||||
assertEquals("No of regions in master", 2, regionsOfTable.size());
|
||||
} finally {
|
||||
admin.setBalancerRunning(true, false);
|
||||
secondSplit = false;
|
||||
firstSplitCompleted = false;
|
||||
callRollBack = false;
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
if (admin.isTableAvailable(tableName) && admin.isTableEnabled(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
admin.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException,
|
||||
InterruptedException {
|
||||
Put p = new Put(Bytes.toBytes("row1"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row2"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row3"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row4"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
|
||||
t.put(p);
|
||||
admin.flush(tableName);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testShouldThrowIOExceptionIfStoreFileSizeIsEmptyAndSHouldSuccessfullyExecuteRollback()
|
||||
throws Exception {
|
||||
|
@ -582,7 +701,7 @@ public class TestSplitTransactionOnCluster {
|
|||
|
||||
}
|
||||
|
||||
private void testSplitBeforeSettingSplittingInZK(boolean nodeCreated) throws IOException,
|
||||
private void testSplitBeforeSettingSplittingInZKInternals() throws IOException,
|
||||
KeeperException {
|
||||
final byte[] tableName = Bytes.toBytes("testSplitBeforeSettingSplittingInZK");
|
||||
HBaseAdmin admin = TESTING_UTIL.getHBaseAdmin();
|
||||
|
@ -596,15 +715,7 @@ public class TestSplitTransactionOnCluster {
|
|||
int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName());
|
||||
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
SplitTransaction st = null;
|
||||
if (nodeCreated) {
|
||||
st = new MockedSplitTransaction(regions.get(0), null) {
|
||||
@Override
|
||||
int transitionNodeSplitting(ZooKeeperWatcher zkw, HRegionInfo parent,
|
||||
ServerName serverName, int version) throws KeeperException, IOException {
|
||||
throw new IOException();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
{
|
||||
st = new MockedSplitTransaction(regions.get(0), null) {
|
||||
@Override
|
||||
int createNodeSplitting(ZooKeeperWatcher zkw, HRegionInfo region, ServerName serverName)
|
||||
|
@ -618,9 +729,7 @@ public class TestSplitTransactionOnCluster {
|
|||
} catch (IOException e) {
|
||||
String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), regions.get(0)
|
||||
.getRegionInfo().getEncodedName());
|
||||
if (nodeCreated) {
|
||||
assertFalse(ZKUtil.checkExists(regionServer.getZooKeeper(), node) == -1);
|
||||
} else {
|
||||
{
|
||||
assertTrue(ZKUtil.checkExists(regionServer.getZooKeeper(), node) == -1);
|
||||
}
|
||||
assertTrue(st.rollback(regionServer, regionServer));
|
||||
|
@ -636,8 +745,43 @@ public class TestSplitTransactionOnCluster {
|
|||
|
||||
public static class MockedSplitTransaction extends SplitTransaction {
|
||||
|
||||
private HRegion currentRegion;
|
||||
public MockedSplitTransaction(HRegion r, byte[] splitrow) {
|
||||
super(r, splitrow);
|
||||
this.currentRegion = r;
|
||||
}
|
||||
|
||||
@Override
|
||||
void transitionZKNode(Server server, RegionServerServices services, HRegion a, HRegion b)
|
||||
throws IOException {
|
||||
if (this.currentRegion.getRegionInfo().getTableNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
try {
|
||||
if (!secondSplit){
|
||||
callRollBack = true;
|
||||
latch.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
|
||||
}
|
||||
super.transitionZKNode(server, services, a, b);
|
||||
if (this.currentRegion.getRegionInfo().getTableNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
firstSplitCompleted = true;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public boolean rollback(Server server, RegionServerServices services) throws IOException {
|
||||
if (this.currentRegion.getRegionInfo().getTableNameAsString()
|
||||
.equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
|
||||
if(secondSplit){
|
||||
super.rollback(server, services);
|
||||
latch.countDown();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return super.rollback(server, services);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue