HBASE-9489 Add cp hooks in online merge before and after setting PONR
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1547996 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a4045ddcb8
commit
5fbbef29f3
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* The field or the parameter to which this annotation can be applied only when it
|
||||
* holds mutations for hbase:meta table.
|
||||
*/
|
||||
@Documented
|
||||
@Target( { ElementType.LOCAL_VARIABLE, ElementType.PARAMETER })
|
||||
@Retention(RetentionPolicy.CLASS)
|
||||
public @interface MetaMutationAnnotation {
|
||||
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
||||
/**
|
||||
* An abstract class that implements RegionServerObserver.
|
||||
* By extending it, you can create your own region server observer without
|
||||
* overriding all abstract methods of RegionServerObserver.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
@InterfaceStability.Evolving
|
||||
public class BaseRegionServerObserver implements RegionServerObserver {
|
||||
|
||||
@Override
|
||||
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
|
||||
throws IOException { }
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
|
||||
HRegion regionB) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
|
||||
HRegion regionB, HRegion mergedRegion) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException { }
|
||||
|
||||
}
|
|
@ -19,8 +19,12 @@
|
|||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
||||
public interface RegionServerObserver extends Coprocessor {
|
||||
|
||||
|
@ -32,4 +36,73 @@ public interface RegionServerObserver extends Coprocessor {
|
|||
void preStopRegionServer(
|
||||
final ObserverContext<RegionServerCoprocessorEnvironment> env)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called before the regions merge.
|
||||
* Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} to skip the merge.
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
* @param ctx
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @throws IOException
|
||||
*/
|
||||
void preMerge(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
final HRegion regionA, final HRegion regionB) throws IOException;
|
||||
|
||||
/**
|
||||
* called after the regions merge.
|
||||
* @param c
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @param mergedRegion
|
||||
* @throws IOException
|
||||
*/
|
||||
void postMerge(final ObserverContext<RegionServerCoprocessorEnvironment> c,
|
||||
final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called before PONR step as part of regions merge transaction. Calling
|
||||
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge
|
||||
* @param ctx
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates.
|
||||
* Any puts or deletes to execute on hbase:meta can be added to the mutations.
|
||||
* @throws IOException
|
||||
*/
|
||||
void preMergeCommit(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
final HRegion regionA, final HRegion regionB,
|
||||
@MetaMutationAnnotation List<Mutation> metaEntries) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called after PONR step as part of regions merge transaction.
|
||||
* @param ctx
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @param mergedRegion
|
||||
* @throws IOException
|
||||
*/
|
||||
void postMergeCommit(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called before the roll back of the regions merge.
|
||||
* @param ctx
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @throws IOException
|
||||
*/
|
||||
void preRollBackMerge(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
final HRegion regionA, final HRegion regionB) throws IOException;
|
||||
|
||||
/**
|
||||
* This will be called after the roll back of the regions merge.
|
||||
* @param ctx
|
||||
* @param regionA
|
||||
* @param regionB
|
||||
* @throws IOException
|
||||
*/
|
||||
void postRollBackMerge(final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
final HRegion regionA, final HRegion regionB) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -34,11 +34,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -142,6 +148,8 @@ public class RegionMergeTransaction {
|
|||
private static IOException closedByOtherException = new IOException(
|
||||
"Failed to close region: already closed by another thread");
|
||||
|
||||
private RegionServerCoprocessorHost rsCoprocessorHost = null;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param a region a to merge
|
||||
|
@ -231,9 +239,20 @@ public class RegionMergeTransaction {
|
|||
*/
|
||||
public HRegion execute(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
if (rsCoprocessorHost == null) {
|
||||
rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null;
|
||||
}
|
||||
HRegion mergedRegion = createMergedRegion(server, services);
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
|
||||
}
|
||||
return stepsAfterPONR(server, services, mergedRegion);
|
||||
}
|
||||
|
||||
public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
openMergedRegion(server, services, mergedRegion);
|
||||
transitionZKNode(server, services);
|
||||
transitionZKNode(server, services, mergedRegion);
|
||||
return mergedRegion;
|
||||
}
|
||||
|
||||
|
@ -255,10 +274,95 @@ public class RegionMergeTransaction {
|
|||
throw new IOException("Server is stopped or stopping");
|
||||
}
|
||||
|
||||
if (rsCoprocessorHost != null) {
|
||||
if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
|
||||
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
|
||||
+ this.region_b + " merge.");
|
||||
}
|
||||
}
|
||||
|
||||
// If true, no cluster to write meta edits to or to update znodes in.
|
||||
boolean testing = server == null ? true : server.getConfiguration()
|
||||
.getBoolean("hbase.testing.nocluster", false);
|
||||
|
||||
HRegion mergedRegion = stepsBeforePONR(server, services, testing);
|
||||
|
||||
@MetaMutationAnnotation
|
||||
List<Mutation> metaEntries = new ArrayList<Mutation>();
|
||||
if (rsCoprocessorHost != null) {
|
||||
if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
|
||||
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
|
||||
+ this.region_b + " merge.");
|
||||
}
|
||||
try {
|
||||
for (Mutation p : metaEntries) {
|
||||
HRegionInfo.parseRegionName(p.getRow());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Row key of mutation from coprocessor is not parsable as region name."
|
||||
+ "Mutations from coprocessor should only be for hbase:meta table.", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// This is the point of no return. Similar with SplitTransaction.
|
||||
// IF we reach the PONR then subsequent failures need to crash out this
|
||||
// regionserver
|
||||
this.journal.add(JournalEntry.PONR);
|
||||
|
||||
// Add merged region and delete region_a and region_b
|
||||
// as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
|
||||
// will determine whether the region is merged or not in case of failures.
|
||||
// If it is successful, master will roll-forward, if not, master will
|
||||
// rollback
|
||||
if (!testing) {
|
||||
if (metaEntries.isEmpty()) {
|
||||
MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
|
||||
.getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
|
||||
} else {
|
||||
mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
|
||||
region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
|
||||
}
|
||||
}
|
||||
return mergedRegion;
|
||||
}
|
||||
|
||||
private void mergeRegionsAndPutMetaEntries(CatalogTracker catalogTracker,
|
||||
HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName,
|
||||
List<Mutation> metaEntries) throws IOException {
|
||||
prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries);
|
||||
MetaEditor.mutateMetaTable(catalogTracker, metaEntries);
|
||||
}
|
||||
|
||||
public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
|
||||
HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
|
||||
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
|
||||
|
||||
// Put for parent
|
||||
Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged);
|
||||
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray());
|
||||
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray());
|
||||
mutations.add(putOfMerged);
|
||||
// Deletes for merging regions
|
||||
Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA);
|
||||
Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB);
|
||||
mutations.add(deleteA);
|
||||
mutations.add(deleteB);
|
||||
// The merged is a new region, openSeqNum = 1 is fine.
|
||||
addLocation(putOfMerged, serverName, 1);
|
||||
}
|
||||
|
||||
public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
|
||||
p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
|
||||
.toBytes(sn.getHostAndPort()));
|
||||
p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
|
||||
.getStartcode()));
|
||||
p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
|
||||
return p;
|
||||
}
|
||||
|
||||
public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
|
||||
boolean testing) throws IOException {
|
||||
// Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
|
||||
// have zookeeper so don't do zk stuff if server or zookeeper is null
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
|
@ -316,23 +420,6 @@ public class RegionMergeTransaction {
|
|||
this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
|
||||
HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
|
||||
this.region_b, this.mergedRegionInfo);
|
||||
|
||||
|
||||
// This is the point of no return. Similar with SplitTransaction.
|
||||
// IF we reach the PONR then subsequent failures need to crash out this
|
||||
// regionserver
|
||||
this.journal.add(JournalEntry.PONR);
|
||||
|
||||
// Add merged region and delete region_a and region_b
|
||||
// as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
|
||||
// will determine whether the region is merged or not in case of failures.
|
||||
// If it is successful, master will roll-forward, if not, master will
|
||||
// rollback
|
||||
if (!testing) {
|
||||
MetaEditor.mergeRegions(server.getCatalogTracker(),
|
||||
mergedRegion.getRegionInfo(), region_a.getRegionInfo(),
|
||||
region_b.getRegionInfo(), server.getServerName());
|
||||
}
|
||||
return mergedRegion;
|
||||
}
|
||||
|
||||
|
@ -478,8 +565,8 @@ public class RegionMergeTransaction {
|
|||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link #rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
void transitionZKNode(final Server server, final RegionServerServices services)
|
||||
throws IOException {
|
||||
void transitionZKNode(final Server server, final RegionServerServices services,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
if (server == null || server.getZooKeeper() == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -519,6 +606,10 @@ public class RegionMergeTransaction {
|
|||
+ mergedRegionInfo.getEncodedName(), e);
|
||||
}
|
||||
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
|
||||
}
|
||||
|
||||
// Leaving here, the mergedir with its dross will be in place but since the
|
||||
// merge was successful, just leave it; it'll be cleaned when region_a is
|
||||
// cleaned up by CatalogJanitor on master
|
||||
|
@ -640,6 +731,11 @@ public class RegionMergeTransaction {
|
|||
public boolean rollback(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
assert this.mergedRegionInfo != null;
|
||||
// Coprocessor callback
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
|
||||
}
|
||||
|
||||
boolean result = true;
|
||||
ListIterator<JournalEntry> iterator = this.journal
|
||||
.listIterator(this.journal.size());
|
||||
|
@ -709,6 +805,11 @@ public class RegionMergeTransaction {
|
|||
throw new RuntimeException("Unhandled journal entry: " + je);
|
||||
}
|
||||
}
|
||||
// Coprocessor callback
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
|
||||
|
@ -62,6 +65,119 @@ public class RegionServerCoprocessorHost extends
|
|||
}
|
||||
}
|
||||
|
||||
public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
|
||||
boolean bypass = false;
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
bypass |= ctx.shouldBypass();
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
|
||||
throws IOException {
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
|
||||
final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
|
||||
boolean bypass = false;
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB,
|
||||
metaEntries);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
bypass |= ctx.shouldBypass();
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
public void postMergeCommit(final HRegion regionA, final HRegion regionB,
|
||||
final HRegion mergedRegion) throws IOException {
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB,
|
||||
mergedRegion);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
|
||||
for (RegionServerEnvironment env : coprocessors) {
|
||||
if (env.getInstance() instanceof RegionServerObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Coprocessor environment extension providing access to region server
|
||||
* related services.
|
||||
|
@ -105,4 +221,4 @@ public class RegionServerCoprocessorHost extends
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1686,4 +1686,31 @@ public class AccessController extends BaseRegionObserver
|
|||
List<HTableDescriptor> descriptors) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
|
||||
HRegion regionB) throws IOException {
|
||||
requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
|
||||
Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
|
||||
HRegion regionB, HRegion mergedRegion) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException { }
|
||||
|
||||
@Override
|
||||
public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException { }
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.RegionServerObserver}
|
||||
* interface hooks at all appropriate times during normal HMaster operations.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestRegionServerObserver {
|
||||
private static final Log LOG = LogFactory.getLog(TestRegionServerObserver.class);
|
||||
|
||||
/**
|
||||
* Test verifies the hooks in regions merge.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCoprocessorHooksInRegionsMerge() throws Exception {
|
||||
final int NUM_MASTERS = 1;
|
||||
final int NUM_RS = 1;
|
||||
final String TABLENAME = "testRegionServerObserver";
|
||||
final String TABLENAME2 = "testRegionServerObserver_2";
|
||||
final byte[] FAM = Bytes.toBytes("fam");
|
||||
|
||||
// Create config to use for this cluster
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
|
||||
RegionServerObserver.class);
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
try {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
RegionServerCoprocessorHost cpHost = regionServer.getCoprocessorHost();
|
||||
Coprocessor coprocessor = cpHost.findCoprocessor(CPRegionServerObserver.class.getName());
|
||||
CPRegionServerObserver regionServerObserver = (CPRegionServerObserver) coprocessor;
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TABLENAME));
|
||||
desc.addFamily(new HColumnDescriptor(FAM));
|
||||
admin.createTable(desc, new byte[][] { Bytes.toBytes("row") });
|
||||
desc = new HTableDescriptor(TableName.valueOf(TABLENAME2));
|
||||
desc.addFamily(new HColumnDescriptor(FAM));
|
||||
admin.createTable(desc, new byte[][] { Bytes.toBytes("row") });
|
||||
assertFalse(regionServerObserver.wasRegionMergeCalled());
|
||||
List<HRegion> regions = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME));
|
||||
admin.mergeRegions(regions.get(0).getRegionInfo().getEncodedNameAsBytes(), regions.get(1)
|
||||
.getRegionInfo().getEncodedNameAsBytes(), true);
|
||||
int regionsCount = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME)).size();
|
||||
while (regionsCount != 1) {
|
||||
regionsCount = regionServer.getOnlineRegions(TableName.valueOf(TABLENAME)).size();
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertTrue(regionServerObserver.wasRegionMergeCalled());
|
||||
assertTrue(regionServerObserver.wasPreMergeCommit());
|
||||
assertTrue(regionServerObserver.wasPostMergeCommit());
|
||||
assertEquals(regionsCount, 1);
|
||||
assertEquals(regionServer.getOnlineRegions(TableName.valueOf(TABLENAME2)).size(), 1);
|
||||
} finally {
|
||||
if (admin != null) admin.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CPRegionServerObserver extends BaseRegionServerObserver {
|
||||
private RegionMergeTransaction rmt = null;
|
||||
private HRegion mergedRegion = null;
|
||||
|
||||
private boolean bypass = false;
|
||||
private boolean preMergeCalled;
|
||||
private boolean preMergeBeforePONRCalled;
|
||||
private boolean preMergeAfterPONRCalled;
|
||||
private boolean preRollBackMergeCalled;
|
||||
private boolean postRollBackMergeCalled;
|
||||
private boolean postMergeCalled;
|
||||
|
||||
public void enableBypass(boolean bypass) {
|
||||
this.bypass = bypass;
|
||||
}
|
||||
|
||||
public void resetStates() {
|
||||
preMergeCalled = false;
|
||||
preMergeBeforePONRCalled = false;
|
||||
preMergeAfterPONRCalled = false;
|
||||
preRollBackMergeCalled = false;
|
||||
postRollBackMergeCalled = false;
|
||||
postMergeCalled = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
|
||||
HRegion regionB) throws IOException {
|
||||
preMergeCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException {
|
||||
preMergeBeforePONRCalled = true;
|
||||
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
|
||||
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
|
||||
List<HRegion> onlineRegions =
|
||||
rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2"));
|
||||
rmt = new RegionMergeTransaction(onlineRegions.get(0), onlineRegions.get(1), true);
|
||||
if (!rmt.prepare(rs)) {
|
||||
LOG.error("Prepare for the region merge of table "
|
||||
+ onlineRegions.get(0).getTableDesc().getNameAsString()
|
||||
+ " failed. So returning null. ");
|
||||
ctx.bypass();
|
||||
return;
|
||||
}
|
||||
mergedRegion = rmt.stepsBeforePONR(rs, rs, false);
|
||||
rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), regionA.getRegionInfo(),
|
||||
regionB.getRegionInfo(), rs.getServerName(), metaEntries);
|
||||
MetaEditor.mutateMetaTable(rs.getCatalogTracker(), metaEntries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB, HRegion mr) throws IOException {
|
||||
preMergeAfterPONRCalled = true;
|
||||
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
|
||||
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
|
||||
rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException {
|
||||
preRollBackMergeCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
|
||||
HRegion regionA, HRegion regionB) throws IOException {
|
||||
postRollBackMergeCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
|
||||
HRegion regionB, HRegion mergedRegion) throws IOException {
|
||||
postMergeCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasPreMergeCalled() {
|
||||
return this.preMergeCalled;
|
||||
}
|
||||
|
||||
public boolean wasPostMergeCalled() {
|
||||
return this.postMergeCalled;
|
||||
}
|
||||
|
||||
public boolean wasPreMergeCommit() {
|
||||
return this.preMergeBeforePONRCalled;
|
||||
}
|
||||
|
||||
public boolean wasPostMergeCommit() {
|
||||
return this.preMergeAfterPONRCalled;
|
||||
}
|
||||
|
||||
public boolean wasPreRollBackMerge() {
|
||||
return this.preRollBackMergeCalled;
|
||||
}
|
||||
|
||||
public boolean wasPostRollBackMerge() {
|
||||
return this.postRollBackMergeCalled;
|
||||
}
|
||||
|
||||
public boolean wasRegionMergeCalled() {
|
||||
return this.preMergeCalled && this.postMergeCalled;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -206,7 +206,7 @@ public class TestRegionMergeTransaction {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWholesomeMerge() throws IOException {
|
||||
public void testWholesomeMerge() throws IOException, InterruptedException {
|
||||
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
|
||||
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
|
||||
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
|
||||
|
@ -217,9 +217,8 @@ public class TestRegionMergeTransaction {
|
|||
RegionMergeTransaction mt = prepareOnGoodRegions();
|
||||
|
||||
// Run the execute. Look at what it returns.
|
||||
Server mockServer = Mockito.mock(Server.class);
|
||||
when(mockServer.getConfiguration())
|
||||
.thenReturn(TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
|
||||
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration());
|
||||
HRegion mergedRegion = mt.execute(mockServer, null);
|
||||
// Do some assertions about execution.
|
||||
assertTrue(this.fs.exists(mt.getMergesDir()));
|
||||
|
@ -249,7 +248,7 @@ public class TestRegionMergeTransaction {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRollback() throws IOException {
|
||||
public void testRollback() throws IOException, InterruptedException {
|
||||
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
|
||||
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
|
||||
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
|
||||
|
@ -265,9 +264,8 @@ public class TestRegionMergeTransaction {
|
|||
|
||||
// Run the execute. Look at what it returns.
|
||||
boolean expectedException = false;
|
||||
Server mockServer = Mockito.mock(Server.class);
|
||||
when(mockServer.getConfiguration())
|
||||
.thenReturn(TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
|
||||
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration());
|
||||
try {
|
||||
mt.execute(mockServer, null);
|
||||
} catch (MockedFailedMergedRegionCreation e) {
|
||||
|
@ -308,7 +306,7 @@ public class TestRegionMergeTransaction {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFailAfterPONR() throws IOException, KeeperException {
|
||||
public void testFailAfterPONR() throws IOException, KeeperException, InterruptedException {
|
||||
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
|
||||
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
|
||||
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
|
||||
|
@ -325,9 +323,8 @@ public class TestRegionMergeTransaction {
|
|||
|
||||
// Run the execute. Look at what it returns.
|
||||
boolean expectedException = false;
|
||||
Server mockServer = Mockito.mock(Server.class);
|
||||
when(mockServer.getConfiguration())
|
||||
.thenReturn(TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
|
||||
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration());
|
||||
try {
|
||||
mt.execute(mockServer, null);
|
||||
} catch (MockedFailedMergedRegionOpen e) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
|||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
|
@ -192,7 +194,7 @@ public class TestAccessController extends SecureTestUtil {
|
|||
HTableDescriptor htd = new HTableDescriptor(TEST_TABLE.getTableName());
|
||||
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
|
||||
htd.setOwner(USER_OWNER);
|
||||
admin.createTable(htd);
|
||||
admin.createTable(htd, new byte[][] { Bytes.toBytes("s") });
|
||||
TEST_UTIL.waitTableEnabled(TEST_TABLE.getTableName().getName());
|
||||
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE.getTableName()).get(0);
|
||||
|
@ -670,6 +672,24 @@ public class TestAccessController extends SecureTestUtil {
|
|||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeRegions() throws Exception {
|
||||
|
||||
final List<HRegion> regions = TEST_UTIL.getHBaseCluster().findRegionsForTable(TEST_TABLE.getTableName());
|
||||
|
||||
PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preMerge(
|
||||
ObserverContext.createAndPrepare(RSCP_ENV, null),
|
||||
regions.get(0),regions.get(1));
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER);
|
||||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlush() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue