HBASE-9249 Add cp hook before setting PONR in split

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1524114 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
rajeshbabu 2013-09-17 16:13:26 +00:00
parent 63a3155fbb
commit f9db5c8943
7 changed files with 305 additions and 28 deletions

View File

@ -178,7 +178,7 @@ public class MetaEditor {
* @param mutations Puts and Deletes to execute on hbase:meta
* @throws IOException
*/
static void mutateMetaTable(final CatalogTracker ct, final List<Mutation> mutations)
public static void mutateMetaTable(final CatalogTracker ct, final List<Mutation> mutations)
throws IOException {
HTable t = MetaReader.getMetaHTable(ct);
try {

View File

@ -117,6 +117,16 @@ public abstract class BaseRegionObserver implements RegionObserver {
byte[] splitRow) throws IOException {
}
@Override
public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
byte[] splitKey, List<Mutation> metaEntries) throws IOException {
}
@Override
public void preSplitAfterPONR(
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
}
@Override
public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {

View File

@ -342,6 +342,27 @@ public interface RegionObserver extends Coprocessor {
void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final HRegion l,
final HRegion r) throws IOException;
/**
* This will be called before PONR step as part of split transaction. Calling
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the split
* @param ctx
* @param splitKey
* @param metaEntries
* @throws IOException
*/
void preSplitBeforePONR(final ObserverContext<RegionCoprocessorEnvironment> ctx,
byte[] splitKey, List<Mutation> metaEntries) throws IOException;
/**
* This will be called after PONR step as part of split transaction
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param ctx
* @throws IOException
*/
void preSplitAfterPONR(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
/**
* This will be called before the roll back of the split region is completed
* @param ctx

View File

@ -663,6 +663,44 @@ public class RegionCoprocessorHost
}
}
public boolean preSplitBeforePONR(byte[] splitKey, List<Mutation> metaEntries) throws IOException {
boolean bypass = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx,
splitKey, metaEntries);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
}
public void preSplitAfterPONR() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/**
* Invoked just before the rollback of a failed split is started
* @throws IOException

View File

@ -34,11 +34,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -229,6 +233,64 @@ public class SplitTransaction {
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
this.fileSplitTimeout);
PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
List<Mutation> metaEntries = new ArrayList<Mutation>();
if (this.parent.getCoprocessorHost() != null) {
if (this.parent.getCoprocessorHost().
preSplitBeforePONR(this.splitrow, metaEntries)) {
throw new IOException("Coprocessor bypassing region "
+ this.parent.getRegionNameAsString() + " split.");
}
try {
for (Mutation p : metaEntries) {
HRegionInfo.parseRegionName(p.getRow());
}
} catch (IOException e) {
LOG.error("Row key of mutation from coprossor is not parsable as region name."
+ "Mutations from coprocessor should only for hbase:meta table.");
throw e;
}
}
// This is the point of no return. Adding subsequent edits to .META. as we
// do below when we do the daughter opens adding each to .META. can fail in
// various interesting ways the most interesting of which is a timeout
// BUT the edits all go through (See HBASE-3872). IF we reach the PONR
// then subsequent failures need to crash out this regionserver; the
// server shutdown processing should be able to fix-up the incomplete split.
// The offlined parent will have the daughters as extra columns. If
// we leave the daughter regions in place and do not remove them when we
// crash out, then they will have their references to the parent in place
// still and the server shutdown fixup of .META. will point to these
// regions.
// We should add PONR JournalEntry before offlineParentInMeta,so even if
// OfflineParentInMeta timeout,this will cause regionserver exit,and then
// master ServerShutdownHandler will fix daughter & avoid data loss. (See
// HBase-4562).
this.journal.add(JournalEntry.PONR);
// Edit parent in meta. Offlines parent region and adds splita and splitb
// as an atomic update. See HBASE-7721. This update to META makes the region
// will determine whether the region is split or not in case of failures.
// If it is successful, master will roll-forward, if not, master will rollback
// and assign the parent region.
if (!testing) {
if (metaEntries == null || metaEntries.isEmpty()) {
MetaEditor.splitRegion(server.getCatalogTracker(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
daughterRegions.getSecond().getRegionInfo(), server.getServerName());
} else {
offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
.getSecond().getRegionInfo(), server.getServerName(), metaEntries);
}
}
return daughterRegions;
}
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
final RegionServerServices services, boolean testing) throws IOException {
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
// have zookeeper so don't do zk stuff if server or zookeeper is null
if (server != null && server.getZooKeeper() != null) {
@ -305,33 +367,6 @@ public class SplitTransaction {
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
// This is the point of no return. Adding subsequent edits to hbase:meta as we
// do below when we do the daughter opens adding each to hbase:meta can fail in
// various interesting ways the most interesting of which is a timeout
// BUT the edits all go through (See HBASE-3872). IF we reach the PONR
// then subsequent failures need to crash out this regionserver; the
// server shutdown processing should be able to fix-up the incomplete split.
// The offlined parent will have the daughters as extra columns. If
// we leave the daughter regions in place and do not remove them when we
// crash out, then they will have their references to the parent in place
// still and the server shutdown fixup of hbase:meta will point to these
// regions.
// We should add PONR JournalEntry before offlineParentInMeta,so even if
// OfflineParentInMeta timeout,this will cause regionserver exit,and then
// master ServerShutdownHandler will fix daughter & avoid data loss. (See
// HBase-4562).
this.journal.add(JournalEntry.PONR);
// Edit parent in meta. Offlines parent region and adds splita and splitb
// as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
// will determine whether the region is split or not in case of failures.
// If it is successful, master will roll-forward, if not, master will rollback
// and assign the parent region.
if (!testing) {
MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
}
return new PairOfSameType<HRegion>(a, b);
}
@ -463,11 +498,54 @@ public class SplitTransaction {
final RegionServerServices services)
throws IOException {
PairOfSameType<HRegion> regions = createDaughters(server, services);
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().preSplitAfterPONR();
}
return stepsAfterPONR(server, services, regions);
}
public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
final RegionServerServices services, PairOfSameType<HRegion> regions)
throws IOException {
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
private void offlineParentInMetaAndputMetaEntries(CatalogTracker catalogTracker,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
ServerName serverName, List<Mutation> metaEntries) throws IOException {
List<Mutation> mutations = metaEntries;
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
//Put for parent
Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
MetaEditor.addDaughtersToPut(putParent, splitA, splitB);
mutations.add(putParent);
//Puts for daughters
Put putA = MetaEditor.makePutFromRegionInfo(splitA);
Put putB = MetaEditor.makePutFromRegionInfo(splitB);
addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
addLocation(putB, serverName, 1);
mutations.add(putA);
mutations.add(putB);
MetaEditor.mutateMetaTable(catalogTracker, mutations);
}
public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(sn.getHostAndPort()));
p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode()));
p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
Bytes.toBytes(openSeqNum));
return p;
}
/*
* Open daughter region in its own thread.
* If we fail, abort this hosting server.

View File

@ -108,6 +108,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
final AtomicInteger ctPreSplitBeforePONR = new AtomicInteger(0);
final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
@ -186,6 +188,19 @@ public class SimpleRegionObserver extends BaseRegionObserver {
ctPreSplit.incrementAndGet();
}
@Override
public void preSplitBeforePONR(
ObserverContext<RegionCoprocessorEnvironment> ctx, byte[] splitKey,
List<Mutation> metaEntries) throws IOException {
ctPreSplitBeforePONR.incrementAndGet();
}
@Override
public void preSplitAfterPONR(
ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
ctPreSplitAfterPONR.incrementAndGet();
}
@Override
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c, HRegion l, HRegion r) {
ctPostSplit.incrementAndGet();
@ -631,6 +646,14 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPreSplit.get();
}
public int getCtPreSplitBeforePONR() {
return ctPreSplitBeforePONR.get();
}
public int getCtPreSplitAfterPONR() {
return ctPreSplitAfterPONR.get();
}
public int getCtPostSplit() {
return ctPostSplit.get();
}

View File

@ -52,14 +52,19 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
@ -74,6 +79,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -865,6 +871,53 @@ public class TestSplitTransactionOnCluster {
}
}
@Test(timeout = 180000)
public void testSplitHooksBeforeAndAfterPONR() throws Exception {
String firstTable = "testSplitHooksBeforeAndAfterPONR_1";
String secondTable = "testSplitHooksBeforeAndAfterPONR_2";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(firstTable));
desc.addCoprocessor(MockedRegionObserver.class.getName());
HColumnDescriptor hcd = new HColumnDescriptor("cf");
desc.addFamily(hcd);
admin.createTable(desc);
desc = new HTableDescriptor(TableName.valueOf(secondTable));
hcd = new HColumnDescriptor("cf");
desc.addFamily(hcd);
admin.createTable(desc);
List<HRegion> firstTableregions = cluster.getRegions(TableName.valueOf(firstTable));
List<HRegion> secondTableRegions = cluster.getRegions(TableName.valueOf(secondTable));
ServerName serverName =
cluster.getServerHoldingRegion(firstTableregions.get(0).getRegionName());
admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(serverName.getServerName()));
HTable table1 = null;
HTable table2 = null;
try {
table1 = new HTable(TESTING_UTIL.getConfiguration(), firstTable);
table2 = new HTable(TESTING_UTIL.getConfiguration(), firstTable);
insertData(Bytes.toBytes(firstTable), admin, table1);
insertData(Bytes.toBytes(secondTable), admin, table2);
admin.split(Bytes.toBytes(firstTable), "row2".getBytes());
firstTableregions = cluster.getRegions(Bytes.toBytes(firstTable));
while (firstTableregions.size() != 2) {
Thread.sleep(1000);
firstTableregions = cluster.getRegions(Bytes.toBytes(firstTable));
}
assertEquals("Number of regions after split should be 2.", 2, firstTableregions.size());
secondTableRegions = cluster.getRegions(Bytes.toBytes(secondTable));
assertEquals("Number of regions after split should be 2.", 2, secondTableRegions.size());
} finally {
if (table1 != null) {
table1.close();
}
if (table2 != null) {
table2.close();
}
TESTING_UTIL.deleteTable(firstTable);
TESTING_UTIL.deleteTable(secondTable);
}
}
private void testSplitBeforeSettingSplittingInZKInternals() throws Exception {
final byte[] tableName = Bytes.toBytes("testSplitBeforeSettingSplittingInZK");
try {
@ -1148,5 +1201,59 @@ public class TestSplitTransactionOnCluster {
super();
}
}
public static class MockedRegionObserver extends BaseRegionObserver {
private SplitTransaction st = null;
private PairOfSameType<HRegion> daughterRegions = null;
@Override
public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
byte[] splitKey, List<Mutation> metaEntries) throws IOException {
RegionCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
List<HRegion> onlineRegions =
rs.getOnlineRegions(TableName.valueOf("testSplitHooksBeforeAndAfterPONR_2"));
HRegion region = onlineRegions.get(0);
for (HRegion r : onlineRegions) {
if (r.getRegionInfo().containsRow(splitKey)) {
region = r;
break;
}
}
st = new SplitTransaction(region, splitKey);
if (!st.prepare()) {
LOG.error("Prepare for the table " + region.getTableDesc().getNameAsString()
+ " failed. So returning null. ");
ctx.bypass();
return;
}
region.forceSplit(splitKey);
daughterRegions = st.stepsBeforePONR(rs, rs, false);
HRegionInfo copyOfParent = new HRegionInfo(region.getRegionInfo());
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
// Put for parent
Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
MetaEditor.addDaughtersToPut(putParent, daughterRegions.getFirst().getRegionInfo(),
daughterRegions.getSecond().getRegionInfo());
metaEntries.add(putParent);
// Puts for daughters
Put putA = MetaEditor.makePutFromRegionInfo(daughterRegions.getFirst().getRegionInfo());
Put putB = MetaEditor.makePutFromRegionInfo(daughterRegions.getSecond().getRegionInfo());
st.addLocation(putA, rs.getServerName(), 1);
st.addLocation(putB, rs.getServerName(), 1);
metaEntries.add(putA);
metaEntries.add(putB);
}
@Override
public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
RegionCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
st.stepsAfterPONR(rs, rs, daughterRegions);
}
}
}