HBASE-18770 Remove bypass method in ObserverContext and implement the

'bypass' logic case by case

Changes Coprocessor ObserverContext 'bypass' semantic. We flip the
default so bypass is NOT supported on Observer invocations; only a
couple of preXXX methods in RegionObserver allow it: e.g.  preGet
and prePut but not preFlush, etc. Everywhere else, we throw
a DoesNotSupportBypassException if a Coprocessor Observer
tries to invoke bypass. Master Observers can no longer stop
or change move, split, assign, create table, etc.

Ditto on complete, the mechanism that allowed a Coprocessor
rule that all subsequent Coprocessors are skipped in an
invocation chain; now, complete is only available to
bypassable methods (and Coprocessors will get an exception if
they try to 'complete' when it is not allowed).

See javadoc for whether a Coprocessor Observer method supports
'bypass'. If no mention, 'bypass' is NOT supported.

M hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
 Added passing of 'bypassable' (and 'completable') and default 'result' argument to
 the Operation constructors rather than pass the excecution engine as parameters.
 Makes it so can clean up RegionObserverHost and make the calling
 clearer regards what is going on.
 Methods that support 'bypass' must set this flag on the Observer.

M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
 Refactoring in here is minor. A few methods that used support bypass
 no longer do so removed the check and the need of an if/else meant a
 left-shift in some code.

M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 Ditto

M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
 In here label explicitly those methods that are bypassable.
 Some changes to make sure we call the corresponding execOperation.

TestMasterObserver had a bunch of test of bypass method. All removed or
disabled.

TODO: What to do w/ the Scanner methods.
This commit is contained in:
Michael Stack 2017-10-26 14:55:53 -07:00
parent 9dfd77595f
commit 16012f93a6
29 changed files with 788 additions and 771 deletions

View File

@ -1,42 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrown if a coprocessor rules we should bypass an operation
*/
@InterfaceAudience.Public
public class BypassCoprocessorException extends CoprocessorException {
private static final long serialVersionUID = 5943889011582357043L;
/** Default Constructor */
public BypassCoprocessorException() {
super();
}
/**
* Constructs the exception and supplies a string as the message
* @param s - message
*/
public BypassCoprocessorException(String s) {
super(s);
}
}

View File

@ -407,9 +407,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
s.close(); s.close();
return; return;
} }
if (region.getCoprocessorHost().preScannerClose(s)) { region.getCoprocessorHost().preScannerClose(s);
return;
}
try { try {
s.close(); s.close();
} finally { } finally {

View File

@ -1,5 +1,4 @@
/* /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -36,14 +35,12 @@ import java.util.function.Function;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -548,11 +545,20 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
ObserverGetter<C, O> observerGetter; ObserverGetter<C, O> observerGetter;
ObserverOperation(ObserverGetter<C, O> observerGetter) { ObserverOperation(ObserverGetter<C, O> observerGetter) {
this(observerGetter, RpcServer.getRequestUser().orElse(null)); this(observerGetter, null);
} }
ObserverOperation(ObserverGetter<C, O> observerGetter, User user) { ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
super(user); this(observerGetter, user, false);
}
ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
this(observerGetter, null, bypassable);
}
ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
super(user != null? user: RpcServer.getRequestUser().orElse(null),
bypassable, bypassable/*'completable': make completable same as bypassable*/);
this.observerGetter = observerGetter; this.observerGetter = observerGetter;
} }
@ -574,6 +580,11 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
super(observerGetter, user); super(observerGetter, user);
} }
public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
}
/** /**
* In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor} * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
* has BulkLoadObserver, RegionObserver, etc), some implementations may not need all * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all
@ -594,15 +605,23 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
private R result; private R result;
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter) { public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
super(observerGetter); this(observerGetter, result, false);
} }
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, User user) { public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
super(observerGetter, user); boolean bypassable) {
this(observerGetter, result, null, bypassable);
} }
void setResult(final R result) { public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
User user) {
this(observerGetter, result, user, false);
}
private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
this.result = result; this.result = result;
} }
@ -621,38 +640,27 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////
// Functions to execute observer hooks and handle results (if any) // Functions to execute observer hooks and handle results (if any)
////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////
protected <O, R> R execOperationWithResult(final R defaultValue,
/**
* Do not call with an observerOperation that is null! Have the caller check.
*/
protected <O, R> R execOperationWithResult(
final ObserverOperationWithResult<O, R> observerOperation) throws IOException { final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
if (observerOperation == null) { boolean bypass = execOperation(observerOperation);
return defaultValue; R result = observerOperation.getResult();
} return bypass == observerOperation.isBypassable()? result: null;
observerOperation.setResult(defaultValue);
execOperation(observerOperation);
return observerOperation.getResult();
}
// what does bypass mean?
protected <O, R> R execOperationWithResult(final boolean ifBypass, final R defaultValue,
final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
if (observerOperation == null) {
return ifBypass ? null : defaultValue;
} else {
observerOperation.setResult(defaultValue);
boolean bypass = execOperation(true, observerOperation);
R result = observerOperation.getResult();
return bypass == ifBypass ? result : null;
}
} }
/**
* @return True if we are to bypass (Can only be <code>true</code> if
* ObserverOperation#isBypassable().
*/
protected <O> boolean execOperation(final ObserverOperation<O> observerOperation) protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
throws IOException { throws IOException {
return execOperation(true, observerOperation);
}
protected <O> boolean execOperation(final boolean earlyExit,
final ObserverOperation<O> observerOperation) throws IOException {
if (observerOperation == null) return false;
boolean bypass = false; boolean bypass = false;
if (observerOperation == null) {
return bypass;
}
List<E> envs = coprocEnvironments.get(); List<E> envs = coprocEnvironments.get();
for (E env : envs) { for (E env : envs) {
observerOperation.prepare(env); observerOperation.prepare(env);
@ -666,8 +674,10 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
} finally { } finally {
currentThread.setContextClassLoader(cl); currentThread.setContextClassLoader(cl);
} }
// Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
bypass |= observerOperation.shouldBypass(); bypass |= observerOperation.shouldBypass();
if (earlyExit && observerOperation.shouldComplete()) { // Internal to shouldComplete, it checks if obeserverOperation#isCompletable().
if (observerOperation.shouldComplete()) {
break; break;
} }
observerOperation.postEnvCall(); observerOperation.postEnvCall();
@ -675,7 +685,6 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
return bypass; return bypass;
} }
/** /**
* Coprocessor classes can be configured in any order, based on that priority is set and * Coprocessor classes can be configured in any order, based on that priority is set and
* chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is * chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is
@ -719,5 +728,4 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
} }
return bypass; return bypass;
} }
} }

View File

@ -73,7 +73,6 @@ public interface MasterObserver {
* Called before a new table is created by * Called before a new table is created by
* {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create * {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create
* table RPC call. * table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param desc the TableDescriptor for the table * @param desc the TableDescriptor for the table
* @param regions the initial regions created for the table * @param regions the initial regions created for the table
@ -95,7 +94,6 @@ public interface MasterObserver {
* Called before a new table is created by * Called before a new table is created by
* {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create * {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create
* table procedure and it is async to the create RPC call. * table procedure and it is async to the create RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param desc the TableDescriptor for the table * @param desc the TableDescriptor for the table
@ -123,7 +121,6 @@ public interface MasterObserver {
/** /**
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a * Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
* table. Called as part of delete table RPC call. * table. Called as part of delete table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
*/ */
@ -143,7 +140,6 @@ public interface MasterObserver {
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a * Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
* table. Called as part of delete table procedure and * table. Called as part of delete table procedure and
* it is async to the delete RPC call. * it is async to the delete RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -156,7 +152,6 @@ public interface MasterObserver {
* Called after {@link org.apache.hadoop.hbase.master.HMaster} deletes a * Called after {@link org.apache.hadoop.hbase.master.HMaster} deletes a
* table. Called as part of delete table procedure and it is async to the * table. Called as part of delete table procedure and it is async to the
* delete RPC call. * delete RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -168,7 +163,6 @@ public interface MasterObserver {
/** /**
* Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a * Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a
* table. Called as part of truncate table RPC call. * table. Called as part of truncate table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
*/ */
@ -190,7 +184,6 @@ public interface MasterObserver {
* Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a * Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a
* table. Called as part of truncate table procedure and it is async * table. Called as part of truncate table procedure and it is async
* to the truncate RPC call. * to the truncate RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -203,7 +196,6 @@ public interface MasterObserver {
* Called after {@link org.apache.hadoop.hbase.master.HMaster} truncates a * Called after {@link org.apache.hadoop.hbase.master.HMaster} truncates a
* table. Called as part of truncate table procedure and it is async to the * table. Called as part of truncate table procedure and it is async to the
* truncate RPC call. * truncate RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -215,7 +207,6 @@ public interface MasterObserver {
/** /**
* Called prior to modifying a table's properties. Called as part of modify * Called prior to modifying a table's properties. Called as part of modify
* table RPC call. * table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
* @param htd the TableDescriptor * @param htd the TableDescriptor
@ -236,7 +227,6 @@ public interface MasterObserver {
/** /**
* Called prior to modifying a table's properties. Called as part of modify * Called prior to modifying a table's properties. Called as part of modify
* table procedure and it is async to the modify table RPC call. * table procedure and it is async to the modify table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -250,7 +240,6 @@ public interface MasterObserver {
/** /**
* Called after to modifying a table's properties. Called as part of modify * Called after to modifying a table's properties. Called as part of modify
* table procedure and it is async to the modify table RPC call. * table procedure and it is async to the modify table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -263,7 +252,6 @@ public interface MasterObserver {
/** /**
* Called prior to enabling a table. Called as part of enable table RPC call. * Called prior to enabling a table. Called as part of enable table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
*/ */
@ -282,7 +270,6 @@ public interface MasterObserver {
/** /**
* Called prior to enabling a table. Called as part of enable table procedure * Called prior to enabling a table. Called as part of enable table procedure
* and it is async to the enable table RPC call. * and it is async to the enable table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -305,7 +292,6 @@ public interface MasterObserver {
/** /**
* Called prior to disabling a table. Called as part of disable table RPC * Called prior to disabling a table. Called as part of disable table RPC
* call. * call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
*/ */
@ -324,7 +310,6 @@ public interface MasterObserver {
/** /**
* Called prior to disabling a table. Called as part of disable table procedure * Called prior to disabling a table. Called as part of disable table procedure
* and it is asyn to the disable table RPC call. * and it is asyn to the disable table RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* *
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableName the name of the table * @param tableName the name of the table
@ -448,8 +433,7 @@ public interface MasterObserver {
final RegionInfo regionInfo, final boolean force) throws IOException {} final RegionInfo regionInfo, final boolean force) throws IOException {}
/** /**
* Called prior to marking a given region as offline. <code>ctx.bypass()</code> will not have any * Called prior to marking a given region as offline.
* impact on this hook.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param regionInfo * @param regionInfo
*/ */
@ -484,14 +468,13 @@ public interface MasterObserver {
/** /**
* Called prior to setting split / merge switch * Called prior to setting split / merge switch
* Supports Coprocessor 'bypass'.
* @param ctx the coprocessor instance's environment * @param ctx the coprocessor instance's environment
* @param newValue the new value submitted in the call * @param newValue the new value submitted in the call
* @param switchType type of switch * @param switchType type of switch
*/ */
default boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx, default void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException { final boolean newValue, final MasterSwitchType switchType) throws IOException {}
return false;
}
/** /**
* Called after setting split / merge switch * Called after setting split / merge switch
@ -538,8 +521,7 @@ public interface MasterObserver {
final RegionInfo regionInfoB) throws IOException {} final RegionInfo regionInfoB) throws IOException {}
/** /**
* This will be called before update META step as part of split transaction. Calling * This will be called before update META step as part of split transaction.
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the split
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param splitKey * @param splitKey
* @param metaEntries * @param metaEntries
@ -552,8 +534,6 @@ public interface MasterObserver {
/** /**
* This will be called after update META step as part of split transaction * This will be called after update META step as part of split transaction
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
*/ */
default void preSplitRegionAfterMETAAction( default void preSplitRegionAfterMETAAction(
@ -570,7 +550,6 @@ public interface MasterObserver {
/** /**
* Called before the regions merge. * Called before the regions merge.
* Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} to skip the merge.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
*/ */
default void preMergeRegionsAction( default void preMergeRegionsAction(
@ -587,8 +566,7 @@ public interface MasterObserver {
final RegionInfo mergedRegion) throws IOException {} final RegionInfo mergedRegion) throws IOException {}
/** /**
* This will be called before update META step as part of regions merge transaction. Calling * This will be called before update META step as part of regions merge transaction.
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates. * @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates.
* Any puts or deletes to execute on hbase:meta can be added to the mutations. * Any puts or deletes to execute on hbase:meta can be added to the mutations.
@ -618,12 +596,9 @@ public interface MasterObserver {
/** /**
* Called prior to modifying the flag used to enable/disable region balancing. * Called prior to modifying the flag used to enable/disable region balancing.
* @param ctx the coprocessor instance's environment * @param ctx the coprocessor instance's environment
* @param newValue the new flag value submitted in the call
*/ */
default boolean preBalanceSwitch(final ObserverContext<MasterCoprocessorEnvironment> ctx, default void preBalanceSwitch(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue) throws IOException { final boolean newValue) throws IOException {}
return newValue;
}
/** /**
* Called after the flag to enable/disable balancing has changed. * Called after the flag to enable/disable balancing has changed.
@ -667,7 +642,6 @@ public interface MasterObserver {
/** /**
* Called before a new snapshot is taken. * Called before a new snapshot is taken.
* Called as part of snapshot RPC call. * Called as part of snapshot RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor for the snapshot * @param snapshot the SnapshotDescriptor for the snapshot
* @param tableDescriptor the TableDescriptor of the table to snapshot * @param tableDescriptor the TableDescriptor of the table to snapshot
@ -689,7 +663,6 @@ public interface MasterObserver {
/** /**
* Called before listSnapshots request has been processed. * Called before listSnapshots request has been processed.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor of the snapshot to list * @param snapshot the SnapshotDescriptor of the snapshot to list
*/ */
@ -698,7 +671,6 @@ public interface MasterObserver {
/** /**
* Called after listSnapshots request has been processed. * Called after listSnapshots request has been processed.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor of the snapshot to list * @param snapshot the SnapshotDescriptor of the snapshot to list
*/ */
@ -708,7 +680,6 @@ public interface MasterObserver {
/** /**
* Called before a snapshot is cloned. * Called before a snapshot is cloned.
* Called as part of restoreSnapshot RPC call. * Called as part of restoreSnapshot RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor for the snapshot * @param snapshot the SnapshotDescriptor for the snapshot
* @param tableDescriptor the TableDescriptor of the table to create * @param tableDescriptor the TableDescriptor of the table to create
@ -731,7 +702,6 @@ public interface MasterObserver {
/** /**
* Called before a snapshot is restored. * Called before a snapshot is restored.
* Called as part of restoreSnapshot RPC call. * Called as part of restoreSnapshot RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor for the snapshot * @param snapshot the SnapshotDescriptor for the snapshot
* @param tableDescriptor the TableDescriptor of the table to restore * @param tableDescriptor the TableDescriptor of the table to restore
@ -754,7 +724,6 @@ public interface MasterObserver {
/** /**
* Called before a snapshot is deleted. * Called before a snapshot is deleted.
* Called as part of deleteSnapshot RPC call. * Called as part of deleteSnapshot RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param snapshot the SnapshotDescriptor of the snapshot to delete * @param snapshot the SnapshotDescriptor of the snapshot to delete
*/ */
@ -774,7 +743,7 @@ public interface MasterObserver {
* Called before a getTableDescriptors request has been processed. * Called before a getTableDescriptors request has been processed.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param tableNamesList the list of table names, or null if querying for all * @param tableNamesList the list of table names, or null if querying for all
* @param descriptors an empty list, can be filled with what to return if bypassing * @param descriptors an empty list, can be filled with what to return in coprocessor
* @param regex regular expression used for filtering the table names * @param regex regular expression used for filtering the table names
*/ */
default void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, default void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
@ -795,7 +764,7 @@ public interface MasterObserver {
/** /**
* Called before a getTableNames request has been processed. * Called before a getTableNames request has been processed.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param descriptors an empty list, can be filled with what to return if bypassing * @param descriptors an empty list, can be filled with what to return by coprocessor
* @param regex regular expression used for filtering the table names * @param regex regular expression used for filtering the table names
*/ */
default void preGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx, default void preGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
@ -815,7 +784,6 @@ public interface MasterObserver {
/** /**
* Called before a new namespace is created by * Called before a new namespace is created by
* {@link org.apache.hadoop.hbase.master.HMaster}. * {@link org.apache.hadoop.hbase.master.HMaster}.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param ns the NamespaceDescriptor for the table * @param ns the NamespaceDescriptor for the table
*/ */
@ -832,7 +800,6 @@ public interface MasterObserver {
/** /**
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a * Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
* namespace * namespace
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param namespace the name of the namespace * @param namespace the name of the namespace
*/ */
@ -849,7 +816,6 @@ public interface MasterObserver {
/** /**
* Called prior to modifying a namespace's properties. * Called prior to modifying a namespace's properties.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param ns the NamespaceDescriptor * @param ns the NamespaceDescriptor
*/ */
@ -883,7 +849,7 @@ public interface MasterObserver {
/** /**
* Called before a listNamespaceDescriptors request has been processed. * Called before a listNamespaceDescriptors request has been processed.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master
* @param descriptors an empty list, can be filled with what to return if bypassing * @param descriptors an empty list, can be filled with what to return by coprocessor
*/ */
default void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, default void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<NamespaceDescriptor> descriptors) throws IOException {} List<NamespaceDescriptor> descriptors) throws IOException {}
@ -1013,7 +979,6 @@ public interface MasterObserver {
/** /**
* Called before merge regions request. * Called before merge regions request.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx coprocessor environment * @param ctx coprocessor environment
* @param regionsToMerge regions to be merged * @param regionsToMerge regions to be merged
*/ */

View File

@ -28,10 +28,10 @@ import java.util.Optional;
/** /**
* Carries the execution state for a given invocation of an Observer coprocessor * Carries the execution state for a given invocation of an Observer coprocessor
* ({@link RegionObserver}, {@link MasterObserver}, or {@link WALObserver}) * ({@link RegionObserver}, {@link MasterObserver}, or {@link WALObserver})
* method. The same ObserverContext instance is passed sequentially to all loaded * method. The same ObserverContext instance is passed sequentially to all loaded
* coprocessors for a given Observer method trigger, with the * coprocessors for a given Observer method trigger, with the
* <code>CoprocessorEnvironment</code> reference swapped out for each * <code>CoprocessorEnvironment</code> reference set appropriately for each Coprocessor type:
* coprocessor. * e.g. the RegionCoprocessorEnvironment is passed to RegionCoprocessors, and so on.
* @param <E> The {@link CoprocessorEnvironment} subclass applicable to the * @param <E> The {@link CoprocessorEnvironment} subclass applicable to the
* revelant Observer interface. * revelant Observer interface.
*/ */
@ -41,15 +41,39 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
E getEnvironment(); E getEnvironment();
/** /**
* Call to indicate that the current coprocessor's return value should be * Call to indicate that the current coprocessor's return value (or parameter -- depends on the
* used in place of the normal HBase obtained value. * call-type) should be used in place of the value that would be obtained via normal processing;
* i.e. bypass the core call and return the Coprocessor's result instead. DOES NOT work for all
* Coprocessor invocations, only on a small subset of methods, mostly preXXX calls in
* RegionObserver. Check javadoc on the pertinent Coprocessor Observer to see if
* <code>bypass</code> is supported.
* <p>This behavior of honoring only a subset of methods is new since hbase-2.0.0.
* <p>Where bypass is supported what is being bypassed is all of the core code
* implementing the remainder of the operation. In order to understand what
* calling bypass() will skip, a coprocessor implementer should read and
* understand all of the remaining code and its nuances. Although this
* is good practice for coprocessor developers in general, it demands a lot.
* What is skipped is extremely version dependent. The core code will vary, perhaps significantly,
* even between point releases. We do not provide the promise of consistent behavior even between
* point releases for the bypass semantic. To achieve
* that we could not change any code between hook points. Therefore the
* coprocessor implementer becomes an HBase core developer in practice as soon
* as they rely on bypass(). Every release of HBase may break the assumption
* that the replacement for the bypassed code takes care of all necessary
* skipped concerns. Because those concerns can change at any point, such an
* assumption is never safe.</p>
* @see #complete()
*/ */
void bypass(); void bypass();
/** /**
* Call to indicate that additional coprocessors further down the execution * Call to skip out on calling remaining coprocessors in current execution chain (there may be
* chain do not need to be invoked. Implies that this coprocessor's response * more than one coprocessor chained to a method call). Implies that this coprocessor's response
* is definitive. * is definitive.
* <p>Since hbase-2.0.0, only <code>complete</code> of 'bypassable' methods has an effect. See
* javadoc on the Coprocessor Observer method as to whether bypass (and thereby 'complete') is
* supported. This behavior of honoring only a subset of methods is new since hbase-2.0.0.
* @see #bypass()
*/ */
void complete(); void complete();
@ -60,5 +84,4 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
* context. * context.
*/ */
Optional<User> getCaller(); Optional<User> getCaller();
} }

View File

@ -35,11 +35,25 @@ import org.apache.yetus.audience.InterfaceStability;
public class ObserverContextImpl<E extends CoprocessorEnvironment> implements ObserverContext<E> { public class ObserverContextImpl<E extends CoprocessorEnvironment> implements ObserverContext<E> {
private E env; private E env;
private boolean bypass; private boolean bypass;
/**
* Is this operation bypassable?
*/
private final boolean bypassable;
/**
* Is this operation completable?
*/
private boolean complete; private boolean complete;
private final boolean completable;
private final User caller; private final User caller;
public ObserverContextImpl(User caller) { public ObserverContextImpl(User caller) {
this(caller, false, false);
}
public ObserverContextImpl(User caller, boolean bypassable, boolean completable) {
this.caller = caller; this.caller = caller;
this.bypassable = bypassable;
this.completable = completable;
} }
public E getEnvironment() { public E getEnvironment() {
@ -50,11 +64,25 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
this.env = env; this.env = env;
} }
public boolean isBypassable() {
return this.bypassable;
};
public void bypass() { public void bypass() {
if (!this.bypassable) {
throw new UnsupportedOperationException("This method does not support 'bypass'.");
}
bypass = true; bypass = true;
} }
public boolean isCompleable() {
return this.completable;
};
public void complete() { public void complete() {
if (!this.completable) {
throw new UnsupportedOperationException("This method does not support 'complete'.");
}
complete = true; complete = true;
} }
@ -63,6 +91,9 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
* coprocessors, {@code false} otherwise. * coprocessors, {@code false} otherwise.
*/ */
public boolean shouldBypass() { public boolean shouldBypass() {
if (!isBypassable()) {
return false;
}
if (bypass) { if (bypass) {
bypass = false; bypass = false;
return true; return true;
@ -75,6 +106,9 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
* coprocessors, {@code false} otherwise. * coprocessors, {@code false} otherwise.
*/ */
public boolean shouldComplete() { public boolean shouldComplete() {
if (!isCompleable()) {
return false;
}
if (complete) { if (complete) {
complete = false; complete = false;
return true; return true;

View File

@ -177,6 +177,8 @@ public interface RegionObserver {
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
* available candidates. To alter the files used for compaction, you may mutate the passed in list * available candidates. To alter the files used for compaction, you may mutate the passed in list
* of candidates. If you remove all the candidates then the compaction will be canceled. * of candidates. If you remove all the candidates then the compaction will be canceled.
* <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
* the passed in <code>candidates</code>.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store where compaction is being requested * @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction * @param candidates the store files currently available for compaction
@ -390,7 +392,10 @@ public interface RegionObserver {
* @param byteNow - timestamp bytes * @param byteNow - timestamp bytes
* @param get - the get formed using the current cell's row. Note that the get does not specify * @param get - the get formed using the current cell's row. Note that the get does not specify
* the family and qualifier * the family and qualifier
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/ */
@Deprecated
default void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> c, default void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> c,
Mutation mutation, Cell cell, byte[] byteNow, Get get) throws IOException {} Mutation mutation, Cell cell, byte[] byteNow, Get get) throws IOException {}
@ -435,8 +440,10 @@ public interface RegionObserver {
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that. * If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param miniBatchOp batch of Mutations applied to region. * @param miniBatchOp batch of Mutations applied to region. Coprocessors are discouraged from
* manipulating its state.
*/ */
// Coprocessors can do a form of bypass by changing state in miniBatchOp.
default void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, default void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {} MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {}
@ -926,6 +933,8 @@ public interface RegionObserver {
/** /**
* Called before a {@link WALEdit} * Called before a {@link WALEdit}
* replayed for this region. * replayed for this region.
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* damage.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes. * with something that doesn't expose IntefaceAudience.Private classes.
@ -937,6 +946,8 @@ public interface RegionObserver {
/** /**
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* replayed for this region. * replayed for this region.
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* damage.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes. * with something that doesn't expose IntefaceAudience.Private classes.

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -70,21 +69,20 @@ public interface WALObserver {
/** /**
* Called before a {@link WALEdit} * Called before a {@link WALEdit}
* is writen to WAL. * is writen to WAL.
* * Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* @return true if default behavior should be bypassed, false otherwise * damage.
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose * @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0. * InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */
// TODO: return value is not used
@Deprecated @Deprecated
default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, default void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
return false;
}
/** /**
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* is writen to WAL. * is writen to WAL.
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* damage.
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose * @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0. * InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -83,7 +82,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.MergeRegionException;
@ -1704,9 +1702,7 @@ public class HMaster extends HRegionServer implements MasterServices {
try { try {
checkInitialized(); checkInitialized();
if (this.cpHost != null) { if (this.cpHost != null) {
if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) { this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
return;
}
} }
// Warmup the region on the destination before initiating the move. this call // Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets // is synchronous and takes some time. doing it before the source region gets
@ -2895,13 +2891,11 @@ public class HMaster extends HRegionServer implements MasterServices {
TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName())); TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
return MasterProcedureUtil.submitProcedure( return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { nonceGroup, nonce) {
@Override @Override
protected void run() throws IOException { protected void run() throws IOException {
if (getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor)) { getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
throw new BypassCoprocessorException();
}
LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor); LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
// Execute the operation synchronously - wait for the operation to complete before // Execute the operation synchronously - wait for the operation to complete before
// continuing. // continuing.
@ -2929,13 +2923,11 @@ public class HMaster extends HRegionServer implements MasterServices {
TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName())); TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
return MasterProcedureUtil.submitProcedure( return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { nonceGroup, nonce) {
@Override @Override
protected void run() throws IOException { protected void run() throws IOException {
if (getMaster().getMasterCoprocessorHost().preModifyNamespace(namespaceDescriptor)) { getMaster().getMasterCoprocessorHost().preModifyNamespace(namespaceDescriptor);
throw new BypassCoprocessorException();
}
LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor); LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
// Execute the operation synchronously - wait for the operation to complete before // Execute the operation synchronously - wait for the operation to complete before
// continuing. // continuing.
@ -2961,13 +2953,11 @@ public class HMaster extends HRegionServer implements MasterServices {
throws IOException { throws IOException {
checkInitialized(); checkInitialized();
return MasterProcedureUtil.submitProcedure( return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { nonceGroup, nonce) {
@Override @Override
protected void run() throws IOException { protected void run() throws IOException {
if (getMaster().getMasterCoprocessorHost().preDeleteNamespace(name)) { getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
throw new BypassCoprocessorException();
}
LOG.info(getClientIdAuditPrefix() + " delete " + name); LOG.info(getClientIdAuditPrefix() + " delete " + name);
// Execute the operation synchronously - wait for the operation to complete before // Execute the operation synchronously - wait for the operation to complete before
// continuing. // continuing.
@ -3002,13 +2992,12 @@ public class HMaster extends HRegionServer implements MasterServices {
List<NamespaceDescriptor> getNamespaces() throws IOException { List<NamespaceDescriptor> getNamespaces() throws IOException {
checkInitialized(); checkInitialized();
final List<NamespaceDescriptor> nsds = new ArrayList<>(); final List<NamespaceDescriptor> nsds = new ArrayList<>();
boolean bypass = false;
if (cpHost != null) { if (cpHost != null) {
bypass = cpHost.preListNamespaceDescriptors(nsds); cpHost.preListNamespaceDescriptors(nsds);
} }
if (!bypass) { nsds.addAll(this.clusterSchemaService.getNamespaces());
nsds.addAll(this.clusterSchemaService.getNamespaces()); if (this.cpHost != null) {
if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds); this.cpHost.postListNamespaceDescriptors(nsds);
} }
return nsds; return nsds;
} }
@ -3085,13 +3074,12 @@ public class HMaster extends HRegionServer implements MasterServices {
final List<TableName> tableNameList, final boolean includeSysTables) final List<TableName> tableNameList, final boolean includeSysTables)
throws IOException { throws IOException {
List<TableDescriptor> htds = new ArrayList<>(); List<TableDescriptor> htds = new ArrayList<>();
boolean bypass = cpHost != null? if (cpHost != null) {
cpHost.preGetTableDescriptors(tableNameList, htds, regex): false; cpHost.preGetTableDescriptors(tableNameList, htds, regex);
if (!bypass) { }
htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables); htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
if (cpHost != null) { if (cpHost != null) {
cpHost.postGetTableDescriptors(tableNameList, htds, regex); cpHost.postGetTableDescriptors(tableNameList, htds, regex);
}
} }
return htds; return htds;
} }
@ -3106,10 +3094,12 @@ public class HMaster extends HRegionServer implements MasterServices {
public List<TableName> listTableNames(final String namespace, final String regex, public List<TableName> listTableNames(final String namespace, final String regex,
final boolean includeSysTables) throws IOException { final boolean includeSysTables) throws IOException {
List<TableDescriptor> htds = new ArrayList<>(); List<TableDescriptor> htds = new ArrayList<>();
boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false; if (cpHost != null) {
if (!bypass) { cpHost.preGetTableNames(htds, regex);
htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables); }
if (cpHost != null) cpHost.postGetTableNames(htds, regex); htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
if (cpHost != null) {
cpHost.postGetTableNames(htds, regex);
} }
List<TableName> result = new ArrayList<>(htds.size()); List<TableName> result = new ArrayList<>(htds.size());
for (TableDescriptor htd: htds) result.add(htd.getTableName()); for (TableDescriptor htd: htds) result.add(htd.getTableName());

View File

@ -192,9 +192,17 @@ public class MasterCoprocessorHost
super(masterObserverGetter); super(masterObserverGetter);
} }
public MasterObserverOperation(boolean bypassable) {
this(null, bypassable);
}
public MasterObserverOperation(User user) { public MasterObserverOperation(User user) {
super(masterObserverGetter, user); super(masterObserverGetter, user);
} }
public MasterObserverOperation(User user, boolean bypassable) {
super(masterObserverGetter, user, bypassable);
}
} }
@ -203,8 +211,8 @@ public class MasterCoprocessorHost
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException { public void preCreateNamespace(final NamespaceDescriptor ns) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preCreateNamespace(this, ns); observer.preCreateNamespace(this, ns);
@ -221,8 +229,8 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preDeleteNamespace(final String namespaceName) throws IOException { public void preDeleteNamespace(final String namespaceName) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preDeleteNamespace(this, namespaceName); observer.preDeleteNamespace(this, namespaceName);
@ -239,8 +247,8 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preModifyNamespace(final NamespaceDescriptor ns) throws IOException { public void preModifyNamespace(final NamespaceDescriptor ns) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preModifyNamespace(this, ns); observer.preModifyNamespace(this, ns);
@ -277,9 +285,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preListNamespaceDescriptors(final List<NamespaceDescriptor> descriptors) public void preListNamespaceDescriptors(final List<NamespaceDescriptor> descriptors)
throws IOException { throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preListNamespaceDescriptors(this, descriptors); observer.preListNamespaceDescriptors(this, descriptors);
@ -528,10 +536,10 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preAbortProcedure( public void preAbortProcedure(
final ProcedureExecutor<MasterProcedureEnv> procEnv, final ProcedureExecutor<MasterProcedureEnv> procEnv,
final long procId) throws IOException { final long procId) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preAbortProcedure(this, procId); observer.preAbortProcedure(this, procId);
@ -548,8 +556,8 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preGetProcedures() throws IOException { public void preGetProcedures() throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preGetProcedures(this); observer.preGetProcedures(this);
@ -566,8 +574,8 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preGetLocks() throws IOException { public void preGetLocks() throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preGetLocks(this); observer.preGetLocks(this);
@ -584,9 +592,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preMove(final RegionInfo region, final ServerName srcServer, public void preMove(final RegionInfo region, final ServerName srcServer,
final ServerName destServer) throws IOException { final ServerName destServer) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preMove(this, region, srcServer, destServer); observer.preMove(this, region, srcServer, destServer);
@ -604,8 +612,8 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preAssign(final RegionInfo regionInfo) throws IOException { public void preAssign(final RegionInfo regionInfo) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preAssign(this, regionInfo); observer.preAssign(this, regionInfo);
@ -622,9 +630,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preUnassign(final RegionInfo regionInfo, final boolean force) public void preUnassign(final RegionInfo regionInfo, final boolean force)
throws IOException { throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preUnassign(this, regionInfo, force); observer.preUnassign(this, regionInfo, force);
@ -697,9 +705,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preSetSplitOrMergeEnabled(final boolean newValue, public void preSetSplitOrMergeEnabled(final boolean newValue,
final MasterSwitchType switchType) throws IOException { final MasterSwitchType switchType) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty()? null: new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preSetSplitOrMergeEnabled(this, newValue, switchType); observer.preSetSplitOrMergeEnabled(this, newValue, switchType);
@ -779,11 +787,11 @@ public class MasterCoprocessorHost
* @param user the user * @param user the user
* @throws IOException * @throws IOException
*/ */
public boolean preSplitBeforeMETAAction( public void preSplitBeforeMETAAction(
final byte[] splitKey, final byte[] splitKey,
final List<Mutation> metaEntries, final List<Mutation> metaEntries,
final User user) throws IOException { final User user) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preSplitRegionBeforeMETAAction(this, splitKey, metaEntries); observer.preSplitRegionBeforeMETAAction(this, splitKey, metaEntries);
@ -825,9 +833,9 @@ public class MasterCoprocessorHost
* @param user the user * @param user the user
* @throws IOException * @throws IOException
*/ */
public boolean preMergeRegionsAction( public void preMergeRegionsAction(
final RegionInfo[] regionsToMerge, final User user) throws IOException { final RegionInfo[] regionsToMerge, final User user) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preMergeRegionsAction(this, regionsToMerge); observer.preMergeRegionsAction(this, regionsToMerge);
@ -861,11 +869,11 @@ public class MasterCoprocessorHost
* @param user the user * @param user the user
* @throws IOException * @throws IOException
*/ */
public boolean preMergeRegionsCommit( public void preMergeRegionsCommit(
final RegionInfo[] regionsToMerge, final RegionInfo[] regionsToMerge,
final @MetaMutationAnnotation List<Mutation> metaEntries, final @MetaMutationAnnotation List<Mutation> metaEntries,
final User user) throws IOException { final User user) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries); observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries);
@ -908,14 +916,17 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preBalanceSwitch(final boolean b) throws IOException { // This hook allows Coprocessor change value of balance switch.
return execOperationWithResult(b, coprocEnvironments.isEmpty() ? null : public void preBalanceSwitch(final boolean b) throws IOException {
new ObserverOperationWithResult<MasterObserver, Boolean>(masterObserverGetter) { if (this.coprocEnvironments.isEmpty()) {
@Override return;
public Boolean call(MasterObserver observer) throws IOException { }
return observer.preBalanceSwitch(this, getResult()); execOperation(new MasterObserverOperation() {
} @Override
}); public void call(MasterObserver observer) throws IOException {
observer.preBalanceSwitch(this, b);
}
});
} }
public void postBalanceSwitch(final boolean oldValue, final boolean newValue) public void postBalanceSwitch(final boolean oldValue, final boolean newValue)
@ -931,7 +942,10 @@ public class MasterCoprocessorHost
public void preShutdown() throws IOException { public void preShutdown() throws IOException {
// While stopping the cluster all coprocessors method should be executed first then the // While stopping the cluster all coprocessors method should be executed first then the
// coprocessor should be cleaned up. // coprocessor should be cleaned up.
execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { if (coprocEnvironments.isEmpty()) {
return;
}
execShutdown(new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preShutdown(this); observer.preShutdown(this);
@ -947,7 +961,10 @@ public class MasterCoprocessorHost
public void preStopMaster() throws IOException { public void preStopMaster() throws IOException {
// While stopping master all coprocessors method should be executed first then the coprocessor // While stopping master all coprocessors method should be executed first then the coprocessor
// environment should be cleaned up. // environment should be cleaned up.
execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { if (coprocEnvironments.isEmpty()) {
return;
}
execShutdown(new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preStopMaster(this); observer.preStopMaster(this);
@ -1074,9 +1091,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preGetTableDescriptors(final List<TableName> tableNamesList, public void preGetTableDescriptors(final List<TableName> tableNamesList,
final List<TableDescriptor> descriptors, final String regex) throws IOException { final List<TableDescriptor> descriptors, final String regex) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preGetTableDescriptors(this, tableNamesList, descriptors, regex); observer.preGetTableDescriptors(this, tableNamesList, descriptors, regex);
@ -1094,9 +1111,9 @@ public class MasterCoprocessorHost
}); });
} }
public boolean preGetTableNames(final List<TableDescriptor> descriptors, public void preGetTableNames(final List<TableDescriptor> descriptors,
final String regex) throws IOException { final String regex) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override @Override
public void call(MasterObserver observer) throws IOException { public void call(MasterObserver observer) throws IOException {
observer.preGetTableNames(this, descriptors, regex); observer.preGetTableNames(this, descriptors, regex);

View File

@ -331,7 +331,7 @@ public class MasterRpcServices extends RSRpcServices
boolean newValue = b; boolean newValue = b;
try { try {
if (master.cpHost != null) { if (master.cpHost != null) {
newValue = master.cpHost.preBalanceSwitch(newValue); master.cpHost.preBalanceSwitch(newValue);
} }
try { try {
if (mode == BalanceSwitchMode.SYNC) { if (mode == BalanceSwitchMode.SYNC) {
@ -509,9 +509,7 @@ public class MasterRpcServices extends RSRpcServices
final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
if (master.cpHost != null) { if (master.cpHost != null) {
if (master.cpHost.preAssign(regionInfo)) { master.cpHost.preAssign(regionInfo);
return arr;
}
} }
LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString()); LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
master.getAssignmentManager().assign(regionInfo, true); master.getAssignmentManager().assign(regionInfo, true);
@ -1517,9 +1515,7 @@ public class MasterRpcServices extends RSRpcServices
RegionInfo hri = pair.getFirst(); RegionInfo hri = pair.getFirst();
if (master.cpHost != null) { if (master.cpHost != null) {
if (master.cpHost.preUnassign(hri, force)) { master.cpHost.preUnassign(hri, force);
return urr;
}
} }
LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString() LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
+ " in current location if it is online and reassign.force=" + force); + " in current location if it is online and reassign.force=" + force);
@ -1704,13 +1700,10 @@ public class MasterRpcServices extends RSRpcServices
MasterSwitchType switchType = convert(masterSwitchType); MasterSwitchType switchType = convert(masterSwitchType);
boolean oldValue = master.isSplitOrMergeEnabled(switchType); boolean oldValue = master.isSplitOrMergeEnabled(switchType);
response.addPrevValue(oldValue); response.addPrevValue(oldValue);
boolean bypass = false;
if (master.cpHost != null) { if (master.cpHost != null) {
bypass = master.cpHost.preSetSplitOrMergeEnabled(newValue, switchType); master.cpHost.preSetSplitOrMergeEnabled(newValue, switchType);
}
if (!bypass) {
master.getSplitOrMergeTracker().setSplitOrMergeEnabled(newValue, switchType);
} }
master.getSplitOrMergeTracker().setSplitOrMergeEnabled(newValue, switchType);
if (master.cpHost != null) { if (master.cpHost != null) {
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType); master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
} }
@ -2155,18 +2148,10 @@ public class MasterRpcServices extends RSRpcServices
ListDeadServersResponse.Builder response = ListDeadServersResponse.newBuilder(); ListDeadServersResponse.Builder response = ListDeadServersResponse.newBuilder();
try { try {
master.checkInitialized(); master.checkInitialized();
if (master.cpHost != null) {
master.cpHost.preListDeadServers();
}
Set<ServerName> servers = master.getServerManager().getDeadServers().copyServerNames(); Set<ServerName> servers = master.getServerManager().getDeadServers().copyServerNames();
for (ServerName server : servers) { for (ServerName server : servers) {
response.addServerName(ProtobufUtil.toServerName(server)); response.addServerName(ProtobufUtil.toServerName(server));
} }
if (master.cpHost != null) {
master.cpHost.postListDeadServers();
}
} catch (IOException io) { } catch (IOException io) {
throw new ServiceException(io); throw new ServiceException(io);
} }

View File

@ -530,12 +530,7 @@ public class MergeTableRegionsProcedure
private void preMergeRegions(final MasterProcedureEnv env) throws IOException { private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser()); cpHost.preMergeRegionsAction(regionsToMerge, getUser());
if (ret) {
throw new IOException(
"Coprocessor bypassing regions " + RegionInfo.getShortNameToLog(regionsToMerge) +
" merge.");
}
} }
// TODO: Clean up split and merge. Currently all over the place. // TODO: Clean up split and merge. Currently all over the place.
try { try {
@ -702,13 +697,7 @@ public class MergeTableRegionsProcedure
if (cpHost != null) { if (cpHost != null) {
@MetaMutationAnnotation @MetaMutationAnnotation
final List<Mutation> metaEntries = new ArrayList<Mutation>(); final List<Mutation> metaEntries = new ArrayList<Mutation>();
boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser()); cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
if (ret) {
throw new IOException(
"Coprocessor bypassing regions " + RegionInfo.getShortNameToLog(regionsToMerge) +
" merge.");
}
try { try {
for (Mutation p : metaEntries) { for (Mutation p : metaEntries) {
RegionInfo.parseRegionName(p.getRow()); RegionInfo.parseRegionName(p.getRow());

View File

@ -709,10 +709,7 @@ public class SplitTableRegionProcedure
final List<Mutation> metaEntries = new ArrayList<Mutation>(); final List<Mutation> metaEntries = new ArrayList<Mutation>();
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
if (cpHost.preSplitBeforeMETAAction(getSplitRow(), metaEntries, getUser())) { cpHost.preSplitBeforeMETAAction(getSplitRow(), metaEntries, getUser());
throw new IOException("Coprocessor bypassing region " +
getParentRegion().getRegionNameAsString() + " split.");
}
try { try {
for (Mutation p : metaEntries) { for (Mutation p : metaEntries) {
RegionInfo.parseRegionName(p.getRow()); RegionInfo.parseRegionName(p.getRow());

View File

@ -2407,7 +2407,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Flushing all stores. * Flushing all stores.
* @see #internalFlushcache(Collection, MonitoredTask, boolean) * @see #internalFlushcache(Collection, MonitoredTask, boolean, FlushLifeCycleTracker)
*/ */
private FlushResult internalFlushcache(MonitoredTask status) throws IOException { private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY); return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
@ -2415,7 +2415,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Flushing given stores. * Flushing given stores.
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean) * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean, FlushLifeCycleTracker)
*/ */
private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status, private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
@ -3279,39 +3279,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param batchOp * @param batchOp
*/ */
private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException { private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
if (coprocessorHost == null) {
return;
}
/* Run coprocessor pre hook outside of locks to avoid deadlock */ /* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) { int noOfPuts = 0;
for (int i = 0 ; i < batchOp.operations.length; i++) { int noOfDeletes = 0;
Mutation m = batchOp.getMutation(i); for (int i = 0 ; i < batchOp.operations.length; i++) {
if (m instanceof Put) { Mutation m = batchOp.getMutation(i);
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { if (m instanceof Put) {
// pre hook says skip this Put if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
// mark as success and skip in doMiniBatchMutation // pre hook says skip this Put
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; // mark as success and skip in doMiniBatchMutation
} noOfPuts++;
} else if (m instanceof Delete) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Delete curDel = (Delete) m;
if (curDel.getFamilyCellMap().isEmpty()) {
// handle deleting a row case
prepareDelete(curDel);
}
if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
// pre hook says skip this Delete
// mark as success and skip in doMiniBatchMutation
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
}
} else {
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
// mark the operation return code as failure so that it will not be considered in
// the doMiniBatchMutation
batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
"Put/Delete mutations only supported in batchMutate() now");
} }
if (!walEdit.isEmpty()) { } else if (m instanceof Delete) {
batchOp.walEditsFromCoprocessors[i] = walEdit; Delete curDel = (Delete) m;
walEdit = new WALEdit(); if (curDel.getFamilyCellMap().isEmpty()) {
// handle deleting a row case
prepareDelete(curDel);
} }
if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
// pre hook says skip this Delete
// mark as success and skip in doMiniBatchMutation
noOfDeletes++;
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
}
} else {
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
// mark the operation return code as failure so that it will not be considered in
// the doMiniBatchMutation
batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
"Put/Delete mutations only supported in batchMutate() now");
}
if (!walEdit.isEmpty()) {
batchOp.walEditsFromCoprocessors[i] = walEdit;
walEdit = new WALEdit();
}
}
// Update metrics in same way as it is done when we go the normal processing route (we now
// update general metrics though a Coprocessor did the work).
if (noOfPuts > 0) {
// There were some Puts in the batch.
if (this.metricsRegion != null) {
this.metricsRegion.updatePut();
}
}
if (noOfDeletes > 0) {
// There were some Deletes in the batch.
if (this.metricsRegion != null) {
this.metricsRegion.updateDelete();
} }
} }
} }
@ -3332,7 +3351,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int firstIndex = batchOp.nextIndexToProcess; int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex; int lastIndexExclusive = firstIndex;
boolean success = false; boolean success = false;
boolean doneByCoprocessor = false;
int noOfPuts = 0; int noOfPuts = 0;
int noOfDeletes = 0; int noOfDeletes = 0;
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
@ -3416,43 +3434,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MiniBatchOperationInProgress<Mutation> miniBatchOp = MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) { coprocessorHost.preBatchMutate(miniBatchOp);
doneByCoprocessor = true; for (int i = firstIndex; i < lastIndexExclusive; i++) {
return; if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
} else { // lastIndexExclusive was incremented above.
for (int i = firstIndex; i < lastIndexExclusive; i++) { continue;
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { }
// lastIndexExclusive was incremented above. // we pass (i - firstIndex) below since the call expects a relative index
continue; Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
} if (cpMutations == null) {
// we pass (i - firstIndex) below since the call expects a relative index continue;
Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); }
if (cpMutations == null) { Mutation mutation = batchOp.getMutation(i);
continue; boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
} // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
Mutation mutation = batchOp.getMutation(i); for (int j = 0; j < cpMutations.length; j++) {
boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; Mutation cpMutation = cpMutations[j];
// Else Coprocessor added more Mutations corresponding to the Mutation at this index. checkAndPrepareMutation(cpMutation, replay, now);
for (int j = 0; j < cpMutations.length; j++) {
Mutation cpMutation = cpMutations[j];
checkAndPrepareMutation(cpMutation, replay, now);
// Acquire row locks. If not, the whole batch will fail. // Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can // Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation. // directly add the cells from those mutations to the familyMaps of this mutation.
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap(); Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
// will get added to the memStore later // will get added to the memStore later
mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
// The durability of returned mutation is replaced by the corresponding mutation. // The durability of returned mutation is replaced by the corresponding mutation.
// If the corresponding mutation contains the SKIP_WAL, we shouldn't count the // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
// cells of returned mutation. // cells of returned mutation.
if (!skipWal) { if (!skipWal) {
for (List<Cell> cells : cpFamilyMap.values()) { for (List<Cell> cells : cpFamilyMap.values()) {
cellCount += cells.size(); cellCount += cells.size();
}
} }
} }
} }
@ -3557,13 +3571,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
batchOp.retCodeDetails[i] = batchOp.retCodeDetails[i] = success? OperationStatus.SUCCESS : OperationStatus.FAILURE;
success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
} }
} }
// synced so that the coprocessor contract is adhered to. // synced so that the coprocessor contract is adhered to.
if (!replay && coprocessorHost != null && !doneByCoprocessor) { if (!replay && coprocessorHost != null) {
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts // only for successful puts
if (batchOp.retCodeDetails[i].getOperationStatusCode() if (batchOp.retCodeDetails[i].getOperationStatusCode()
@ -6964,14 +6977,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException { throws IOException {
List<Cell> results = new ArrayList<>(); List<Cell> results = new ArrayList<>();
long before = EnvironmentEdgeManager.currentTime();
// pre-get CP hook // pre-get CP hook
if (withCoprocessor && (coprocessorHost != null)) { if (withCoprocessor && (coprocessorHost != null)) {
if (coprocessorHost.preGet(get, results)) { if (coprocessorHost.preGet(get, results)) {
metricsUpdateForGet(results, before);
return results; return results;
} }
} }
long before = EnvironmentEdgeManager.currentTime();
Scan scan = new Scan(get); Scan scan = new Scan(get);
if (scan.getLoadColumnFamiliesOnDemandValue() == null) { if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault()); scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
@ -7298,6 +7312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
Result cpResult = doCoprocessorPreCall(op, mutation); Result cpResult = doCoprocessorPreCall(op, mutation);
if (cpResult != null) { if (cpResult != null) {
// Metrics updated below in the finally block.
return returnResults? cpResult: null; return returnResults? cpResult: null;
} }
Durability effectiveDurability = getEffectiveDurability(mutation.getDurability()); Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());

View File

@ -936,7 +936,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
/** /**
* Snapshot this stores memstore. Call before running * Snapshot this stores memstore. Call before running
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)} * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController,
* FlushLifeCycleTracker)}
* so it has some work to do. * so it has some work to do.
*/ */
void snapshot() { void snapshot() {
@ -1670,10 +1671,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
// First, see if coprocessor would want to override selection. // First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false; boolean override = getCoprocessorHost().preCompactSelection(this,
//TODO: is it correct way to get CompactionRequest? candidatesForCoproc, tracker, user);
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user);
if (override) { if (override) {
// Coprocessor is overriding normal file selection. // Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));

View File

@ -108,6 +108,8 @@ MultiRowMutationProcessorResponse> {
if (m instanceof Put) { if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
// by pass everything // by pass everything
// Is this right? Bypass everything and not just this individual put?
// This class is going away in hbase2 so lets not sweat it.
return; return;
} }
} else if (m instanceof Delete) { } else if (m instanceof Delete) {
@ -115,6 +117,8 @@ MultiRowMutationProcessorResponse> {
region.prepareDelete(d); region.prepareDelete(d);
if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
// by pass everything // by pass everything
// Is this right? Bypass everything and not just this individual put?
// This class is going away in hbase2 so lets not sweat it.
return; return;
} }
} }

View File

@ -680,7 +680,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else { } else {
// convert duplicate append to get // convert duplicate append to get
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
nonceGroup, nonce); nonceGroup, nonce);
r = Result.create(results); r = Result.create(results);
} }
success = true; success = true;
@ -731,7 +731,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else { } else {
// convert duplicate increment to get // convert duplicate increment to get
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
nonce); nonce);
r = Result.create(results); r = Result.create(results);
} }
success = true; success = true;
@ -2251,7 +2251,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen(); checkOpen();
requestCount.increment(); requestCount.increment();
HRegion region = getRegion(request.getRegion()); HRegion region = getRegion(request.getRegion());
boolean bypass = false;
boolean loaded = false; boolean loaded = false;
Map<byte[], List<Path>> map = null; Map<byte[], List<Path>> map = null;
@ -2278,15 +2277,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
} }
try { try {
if (!bypass) { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, request.getCopyFile());
request.getCopyFile()); if (map != null) {
if (map != null) { loaded = true;
loaded = true;
}
} }
} finally { } finally {
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -2457,16 +2454,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
RpcCallContext context) throws IOException { RpcCallContext context) throws IOException {
region.prepareGet(get); region.prepareGet(get);
List<Cell> results = new ArrayList<>();
boolean stale = region.getRegionInfo().getReplicaId() != 0; boolean stale = region.getRegionInfo().getReplicaId() != 0;
// This method is almost the same as HRegion#get.
List<Cell> results = new ArrayList<>();
long before = EnvironmentEdgeManager.currentTime();
// pre-get CP hook // pre-get CP hook
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preGet(get, results)) { if (region.getCoprocessorHost().preGet(get, results)) {
region.metricsUpdateForGet(results, before);
return Result return Result
.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
} }
} }
long before = EnvironmentEdgeManager.currentTime();
Scan scan = new Scan(get); Scan scan = new Scan(get);
if (scan.getLoadColumnFamiliesOnDemandValue() == null) { if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
@ -2498,6 +2498,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.getCoprocessorHost().postGet(get, results); region.getCoprocessorHost().postGet(get, results);
} }
region.metricsUpdateForGet(results, before); region.metricsUpdateForGet(results, before);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
} }
@ -2729,11 +2730,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
byte[] qualifier = condition.getQualifier().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray();
CompareOperator compareOp = CompareOperator compareOp =
CompareOperator.valueOf(condition.getCompareType().name()); CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator = ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
ProtobufUtil.toComparator(condition.getComparator());
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
processed = region.getCoprocessorHost().preCheckAndPut( processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
row, family, qualifier, compareOp, comparator, put); compareOp, comparator, put);
} }
if (processed == null) { if (processed == null) {
boolean result = region.checkAndMutate(row, family, boolean result = region.checkAndMutate(row, family,
@ -2760,11 +2760,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
byte[] family = condition.getFamily().toByteArray(); byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray();
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator = ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
ProtobufUtil.toComparator(condition.getComparator());
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
processed = region.getCoprocessorHost().preCheckAndDelete( processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
row, family, qualifier, op, comparator, delete); comparator, delete);
} }
if (processed == null) { if (processed == null) {
boolean result = region.checkAndMutate(row, family, boolean result = region.checkAndMutate(row, family,

View File

@ -115,7 +115,10 @@ public class RegionServerCoprocessorHost extends
public void preStop(String message, User user) throws IOException { public void preStop(String message, User user) throws IOException {
// While stopping the region server all coprocessors method should be executed first then the // While stopping the region server all coprocessors method should be executed first then the
// coprocessor should be cleaned up. // coprocessor should be cleaned up.
execShutdown(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation(user) { if (coprocEnvironments.isEmpty()) {
return;
}
execShutdown(new RegionServerObserverOperation(user) {
@Override @Override
public void call(RegionServerObserver observer) throws IOException { public void call(RegionServerObserver observer) throws IOException {
observer.preStopRegionServer(this); observer.preStopRegionServer(this);
@ -169,9 +172,12 @@ public class RegionServerCoprocessorHost extends
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException { throws IOException {
return execOperationWithResult(endpoint, coprocEnvironments.isEmpty() ? null : if (this.coprocEnvironments.isEmpty()) {
return endpoint;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>( new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
rsObserverGetter) { rsObserverGetter, endpoint) {
@Override @Override
public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
return observer.postCreateReplicationEndPoint(this, getResult()); return observer.postCreateReplicationEndPoint(this, getResult());

View File

@ -192,57 +192,54 @@ public class SecureBulkLoadManager {
throw new DoNotRetryIOException("User token cannot be null"); throw new DoNotRetryIOException("User token cannot be null");
} }
boolean bypass = false;
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
} }
boolean loaded = false; boolean loaded = false;
Map<byte[], List<Path>> map = null; Map<byte[], List<Path>> map = null;
try { try {
if (!bypass) { // Get the target fs (HBase region server fs) delegation token
// Get the target fs (HBase region server fs) delegation token // Since we have checked the permission via 'preBulkLoadHFile', now let's give
// Since we have checked the permission via 'preBulkLoadHFile', now let's give // the 'request user' necessary token to operate on the target fs.
// the 'request user' necessary token to operate on the target fs. // After this point the 'doAs' user will hold two tokens, one for the source fs
// After this point the 'doAs' user will hold two tokens, one for the source fs // ('request user'), another for the target fs (HBase region server principal).
// ('request user'), another for the target fs (HBase region server principal). if (userProvider.isHadoopSecurityEnabled()) {
if (userProvider.isHadoopSecurityEnabled()) { FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer");
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer"); targetfsDelegationToken.acquireDelegationToken(fs);
targetfsDelegationToken.acquireDelegationToken(fs);
Token<?> targetFsToken = targetfsDelegationToken.getUserToken(); Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
if (targetFsToken != null if (targetFsToken != null
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){ && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){
ugi.addToken(targetFsToken); ugi.addToken(targetFsToken);
}
} }
}
map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() { map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
@Override @Override
public Map<byte[], List<Path>> run() { public Map<byte[], List<Path>> run() {
FileSystem fs = null; FileSystem fs = null;
try { try {
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) { for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
if(!fs.exists(stageFamily)) { if(!fs.exists(stageFamily)) {
fs.mkdirs(stageFamily); fs.mkdirs(stageFamily);
fs.setPermission(stageFamily, PERM_ALL_ACCESS); fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
} }
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
} }
return null; //We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
} }
}); return null;
if (map != null) {
loaded = true;
} }
});
if (map != null) {
loaded = true;
} }
} finally { } finally {
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {

View File

@ -909,12 +909,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
// Coprocessor hook. // Coprocessor hook.
if (!coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit())) { coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
if (entry.getEdit().isReplay()) {
// Set replication scope null so that this won't be replicated
entry.getKey().serializeReplicationScope(false);
}
}
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
for (WALActionsListener i : listeners) { for (WALActionsListener i : listeners) {
i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());

View File

@ -139,31 +139,29 @@ public class WALCoprocessorHost
} }
} }
/** /**
* @param info * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* @param logKey * with something that doesn't expose IntefaceAudience.Private classes.
* @param logEdit
* @return true if default behavior should be bypassed, false otherwise
* @throws IOException
*/ */
public boolean preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) @Deprecated
public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException { throws IOException {
return execOperationWithResult(false, coprocEnvironments.isEmpty() ? null : // Not bypassable.
new ObserverOperationWithResult<WALObserver, Boolean>(walObserverGetter) { if (this.coprocEnvironments.isEmpty()) {
return;
}
execOperation(new WALObserverOperation() {
@Override @Override
public Boolean call(WALObserver oserver) throws IOException { public void call(WALObserver oserver) throws IOException {
return oserver.preWALWrite(this, info, logKey, logEdit); oserver.preWALWrite(this, info, logKey, logEdit);
} }
}); });
} }
/** /**
* @param info * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* @param logKey * with something that doesn't expose IntefaceAudience.Private classes.
* @param logEdit
* @throws IOException
*/ */
@Deprecated
public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException { throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {

View File

@ -1264,10 +1264,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
} }
@Override @Override
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx, public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException { final boolean newValue, final MasterSwitchType switchType) throws IOException {
requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN); requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN);
return false;
} }
@Override @Override
@ -1282,10 +1281,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
} }
@Override @Override
public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c, public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
boolean newValue) throws IOException { boolean newValue) throws IOException {
requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN); requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN);
return newValue;
} }
@Override @Override

View File

@ -294,12 +294,6 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
} }
} }
@Override
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException {
return false;
}
@Override @Override
public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx, public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException { final boolean newValue, final MasterSwitchType switchType) throws IOException {

View File

@ -77,6 +77,5 @@ public class TestHRegionLocation {
int compare2 = hsl2.compareTo(hsl1); int compare2 = hsl2.compareTo(hsl1);
assertTrue((compare1 > 0)? compare2 < 0: compare2 > 0); assertTrue((compare1 > 0)? compare2 < 0: compare2 > 0);
} }
} }

View File

@ -99,12 +99,11 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
} }
@Override @Override
public boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
boolean bypass = false;
// check table name matches or not. // check table name matches or not.
if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) { if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
return bypass; return;
} }
preWALWriteCalled = true; preWALWriteCalled = true;
// here we're going to remove one keyvalue from the WALEdit, and add // here we're going to remove one keyvalue from the WALEdit, and add
@ -134,7 +133,6 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
LOG.debug("About to delete a KeyValue from WALEdit."); LOG.debug("About to delete a KeyValue from WALEdit.");
cells.remove(deletedCell); cells.remove(deletedCell);
} }
return bypass;
} }
/** /**

View File

@ -90,7 +90,6 @@ public class TestMasterObserver {
public static class CPMasterObserver implements MasterCoprocessor, MasterObserver { public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
private boolean bypass = false;
private boolean preCreateTableCalled; private boolean preCreateTableCalled;
private boolean postCreateTableCalled; private boolean postCreateTableCalled;
private boolean preDeleteTableCalled; private boolean preDeleteTableCalled;
@ -182,10 +181,6 @@ public class TestMasterObserver {
private boolean preLockHeartbeatCalled; private boolean preLockHeartbeatCalled;
private boolean postLockHeartbeatCalled; private boolean postLockHeartbeatCalled;
public void enableBypass(boolean bypass) {
this.bypass = bypass;
}
public void resetStates() { public void resetStates() {
preCreateTableCalled = false; preCreateTableCalled = false;
postCreateTableCalled = false; postCreateTableCalled = false;
@ -301,9 +296,6 @@ public class TestMasterObserver {
@Override @Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableDescriptor desc, RegionInfo[] regions) throws IOException { TableDescriptor desc, RegionInfo[] regions) throws IOException {
if (bypass) {
env.bypass();
}
preCreateTableCalled = true; preCreateTableCalled = true;
} }
@ -324,9 +316,6 @@ public class TestMasterObserver {
@Override @Override
public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName) throws IOException { TableName tableName) throws IOException {
if (bypass) {
env.bypass();
}
preDeleteTableCalled = true; preDeleteTableCalled = true;
} }
@ -347,9 +336,6 @@ public class TestMasterObserver {
@Override @Override
public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName) throws IOException { TableName tableName) throws IOException {
if (bypass) {
env.bypass();
}
preTruncateTableCalled = true; preTruncateTableCalled = true;
} }
@ -367,12 +353,6 @@ public class TestMasterObserver {
return preTruncateTableCalled && !postTruncateTableCalled; return preTruncateTableCalled && !postTruncateTableCalled;
} }
@Override
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException {
return false;
}
@Override @Override
public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx, public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final boolean newValue, final MasterSwitchType switchType) throws IOException { final boolean newValue, final MasterSwitchType switchType) throws IOException {
@ -381,9 +361,6 @@ public class TestMasterObserver {
@Override @Override
public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName, TableDescriptor htd) throws IOException { TableName tableName, TableDescriptor htd) throws IOException {
if (bypass) {
env.bypass();
}
preModifyTableCalled = true; preModifyTableCalled = true;
} }
@ -404,9 +381,6 @@ public class TestMasterObserver {
@Override @Override
public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> env, public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
NamespaceDescriptor ns) throws IOException { NamespaceDescriptor ns) throws IOException {
if (bypass) {
env.bypass();
}
preCreateNamespaceCalled = true; preCreateNamespaceCalled = true;
} }
@ -427,9 +401,6 @@ public class TestMasterObserver {
@Override @Override
public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> env, public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
String name) throws IOException { String name) throws IOException {
if (bypass) {
env.bypass();
}
preDeleteNamespaceCalled = true; preDeleteNamespaceCalled = true;
} }
@ -450,9 +421,6 @@ public class TestMasterObserver {
@Override @Override
public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> env, public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
NamespaceDescriptor ns) throws IOException { NamespaceDescriptor ns) throws IOException {
if (bypass) {
env.bypass();
}
preModifyNamespaceCalled = true; preModifyNamespaceCalled = true;
} }
@ -490,9 +458,6 @@ public class TestMasterObserver {
@Override @Override
public void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> env, public void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> env,
List<NamespaceDescriptor> descriptors) throws IOException { List<NamespaceDescriptor> descriptors) throws IOException {
if (bypass) {
env.bypass();
}
preListNamespaceDescriptorsCalled = true; preListNamespaceDescriptorsCalled = true;
} }
@ -513,9 +478,6 @@ public class TestMasterObserver {
@Override @Override
public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName) throws IOException { TableName tableName) throws IOException {
if (bypass) {
env.bypass();
}
preEnableTableCalled = true; preEnableTableCalled = true;
} }
@ -536,9 +498,6 @@ public class TestMasterObserver {
@Override @Override
public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> env, public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableName tableName) throws IOException { TableName tableName) throws IOException {
if (bypass) {
env.bypass();
}
preDisableTableCalled = true; preDisableTableCalled = true;
} }
@ -619,9 +578,6 @@ public class TestMasterObserver {
public void preMove(ObserverContext<MasterCoprocessorEnvironment> env, public void preMove(ObserverContext<MasterCoprocessorEnvironment> env,
RegionInfo region, ServerName srcServer, ServerName destServer) RegionInfo region, ServerName srcServer, ServerName destServer)
throws IOException { throws IOException {
if (bypass) {
env.bypass();
}
preMoveCalled = true; preMoveCalled = true;
} }
@ -643,9 +599,6 @@ public class TestMasterObserver {
@Override @Override
public void preAssign(ObserverContext<MasterCoprocessorEnvironment> env, public void preAssign(ObserverContext<MasterCoprocessorEnvironment> env,
final RegionInfo regionInfo) throws IOException { final RegionInfo regionInfo) throws IOException {
if (bypass) {
env.bypass();
}
preAssignCalled = true; preAssignCalled = true;
} }
@ -666,9 +619,6 @@ public class TestMasterObserver {
@Override @Override
public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> env, public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> env,
final RegionInfo regionInfo, final boolean force) throws IOException { final RegionInfo regionInfo, final boolean force) throws IOException {
if (bypass) {
env.bypass();
}
preUnassignCalled = true; preUnassignCalled = true;
} }
@ -709,9 +659,6 @@ public class TestMasterObserver {
@Override @Override
public void preBalance(ObserverContext<MasterCoprocessorEnvironment> env) public void preBalance(ObserverContext<MasterCoprocessorEnvironment> env)
throws IOException { throws IOException {
if (bypass) {
env.bypass();
}
preBalanceCalled = true; preBalanceCalled = true;
} }
@ -730,13 +677,9 @@ public class TestMasterObserver {
} }
@Override @Override
public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> env, boolean b) public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> env, boolean b)
throws IOException { throws IOException {
if (bypass) {
env.bypass();
}
preBalanceSwitchCalled = true; preBalanceSwitchCalled = true;
return b;
} }
@Override @Override
@ -898,9 +841,6 @@ public class TestMasterObserver {
final ObserverContext<MasterCoprocessorEnvironment> env, final ObserverContext<MasterCoprocessorEnvironment> env,
final TableDescriptor desc, final TableDescriptor desc,
final RegionInfo[] regions) throws IOException { final RegionInfo[] regions) throws IOException {
if (bypass) {
env.bypass();
}
preCreateTableActionCalled = true; preCreateTableActionCalled = true;
} }
@ -928,9 +868,6 @@ public class TestMasterObserver {
public void preDeleteTableAction( public void preDeleteTableAction(
final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName) final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName)
throws IOException { throws IOException {
if (bypass) {
env.bypass();
}
preDeleteTableActionCalled = true; preDeleteTableActionCalled = true;
} }
@ -954,9 +891,6 @@ public class TestMasterObserver {
public void preTruncateTableAction( public void preTruncateTableAction(
final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName) final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName)
throws IOException { throws IOException {
if (bypass) {
env.bypass();
}
preTruncateTableActionCalled = true; preTruncateTableActionCalled = true;
} }
@ -980,9 +914,6 @@ public class TestMasterObserver {
final ObserverContext<MasterCoprocessorEnvironment> env, final ObserverContext<MasterCoprocessorEnvironment> env,
final TableName tableName, final TableName tableName,
final TableDescriptor htd) throws IOException { final TableDescriptor htd) throws IOException {
if (bypass) {
env.bypass();
}
preModifyTableActionCalled = true; preModifyTableActionCalled = true;
} }
@ -1005,9 +936,6 @@ public class TestMasterObserver {
public void preEnableTableAction( public void preEnableTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName) final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName)
throws IOException { throws IOException {
if (bypass) {
ctx.bypass();
}
preEnableTableActionCalled = true; preEnableTableActionCalled = true;
} }
@ -1030,9 +958,6 @@ public class TestMasterObserver {
public void preDisableTableAction( public void preDisableTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName) final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName)
throws IOException { throws IOException {
if (bypass) {
ctx.bypass();
}
preDisableTableActionCalled = true; preDisableTableActionCalled = true;
} }
@ -1357,7 +1282,6 @@ public class TestMasterObserver {
HMaster master = cluster.getMaster(); HMaster master = cluster.getMaster();
MasterCoprocessorHost host = master.getMasterCoprocessorHost(); MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class); CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.enableBypass(true);
cp.resetStates(); cp.resetStates();
assertFalse("No table created yet", cp.wasCreateTableCalled()); assertFalse("No table created yet", cp.wasCreateTableCalled());
@ -1370,7 +1294,6 @@ public class TestMasterObserver {
admin.createTable(htd, Arrays.copyOfRange(HBaseTestingUtility.KEYS, admin.createTable(htd, Arrays.copyOfRange(HBaseTestingUtility.KEYS,
1, HBaseTestingUtility.KEYS.length)); 1, HBaseTestingUtility.KEYS.length));
// preCreateTable can't bypass default action.
assertTrue("Test table should be created", cp.wasCreateTableCalled()); assertTrue("Test table should be created", cp.wasCreateTableCalled());
tableCreationLatch.await(); tableCreationLatch.await();
assertTrue("Table pre create handler called.", cp assertTrue("Table pre create handler called.", cp
@ -1389,7 +1312,6 @@ public class TestMasterObserver {
tableCreationLatch = new CountDownLatch(1); tableCreationLatch = new CountDownLatch(1);
admin.disableTable(tableName); admin.disableTable(tableName);
assertTrue(admin.isTableDisabled(tableName)); assertTrue(admin.isTableDisabled(tableName));
// preDisableTable can't bypass default action.
assertTrue("Coprocessor should have been called on table disable", assertTrue("Coprocessor should have been called on table disable",
cp.wasDisableTableCalled()); cp.wasDisableTableCalled());
assertTrue("Disable table handler should be called.", assertTrue("Disable table handler should be called.",
@ -1399,7 +1321,6 @@ public class TestMasterObserver {
assertFalse(cp.wasEnableTableCalled()); assertFalse(cp.wasEnableTableCalled());
admin.enableTable(tableName); admin.enableTable(tableName);
assertTrue(admin.isTableEnabled(tableName)); assertTrue(admin.isTableEnabled(tableName));
// preEnableTable can't bypass default action.
assertTrue("Coprocessor should have been called on table enable", assertTrue("Coprocessor should have been called on table enable",
cp.wasEnableTableCalled()); cp.wasEnableTableCalled());
assertTrue("Enable table handler should be called.", assertTrue("Enable table handler should be called.",
@ -1411,7 +1332,6 @@ public class TestMasterObserver {
// modify table // modify table
htd.setMaxFileSize(512 * 1024 * 1024); htd.setMaxFileSize(512 * 1024 * 1024);
modifyTableSync(admin, tableName, htd); modifyTableSync(admin, tableName, htd);
// preModifyTable can't bypass default action.
assertTrue("Test table should have been modified", assertTrue("Test table should have been modified",
cp.wasModifyTableCalled()); cp.wasModifyTableCalled());
@ -1424,14 +1344,12 @@ public class TestMasterObserver {
deleteTable(admin, tableName); deleteTable(admin, tableName);
assertFalse("Test table should have been deleted", assertFalse("Test table should have been deleted",
admin.tableExists(tableName)); admin.tableExists(tableName));
// preDeleteTable can't bypass default action.
assertTrue("Coprocessor should have been called on table delete", assertTrue("Coprocessor should have been called on table delete",
cp.wasDeleteTableCalled()); cp.wasDeleteTableCalled());
assertTrue("Delete table handler should be called.", assertTrue("Delete table handler should be called.",
cp.wasDeleteTableActionCalled()); cp.wasDeleteTableActionCalled());
// turn off bypass, run the tests again // When bypass was supported, we'd turn off bypass and rerun tests. Leaving rerun in place.
cp.enableBypass(false);
cp.resetStates(); cp.resetStates();
admin.createTable(htd); admin.createTable(htd);
@ -1555,10 +1473,6 @@ public class TestMasterObserver {
MasterCoprocessorHost host = master.getMasterCoprocessorHost(); MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class); CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.enableBypass(false);
cp.resetStates();
// create a table // create a table
Admin admin = UTIL.getAdmin(); Admin admin = UTIL.getAdmin();
admin.createNamespace(NamespaceDescriptor.create(testNamespace).build()); admin.createNamespace(NamespaceDescriptor.create(testNamespace).build());
@ -1567,75 +1481,8 @@ public class TestMasterObserver {
assertNotNull(admin.getNamespaceDescriptor(testNamespace)); assertNotNull(admin.getNamespaceDescriptor(testNamespace));
assertTrue("Test namespace descriptor should have been called", assertTrue("Test namespace descriptor should have been called",
cp.wasGetNamespaceDescriptorCalled()); cp.wasGetNamespaceDescriptorCalled());
// This test used to do a bunch w/ bypass but bypass of these table and namespace stuff has
// turn off bypass, run the tests again // been removed so the testing code was removed.
cp.enableBypass(true);
cp.resetStates();
boolean expected = false;
try {
admin.modifyNamespace(NamespaceDescriptor.create(testNamespace).build());
} catch (BypassCoprocessorException ce) {
expected = true;
}
assertTrue(expected);
assertTrue("Test namespace should not have been modified",
cp.preModifyNamespaceCalledOnly());
assertNotNull(admin.getNamespaceDescriptor(testNamespace));
assertTrue("Test namespace descriptor should have been called",
cp.wasGetNamespaceDescriptorCalled());
expected = false;
try {
admin.deleteNamespace(testNamespace);
} catch (BypassCoprocessorException ce) {
expected = true;
}
assertTrue(expected);
assertTrue("Test namespace should not have been deleted", cp.preDeleteNamespaceCalledOnly());
assertNotNull(admin.getNamespaceDescriptor(testNamespace));
assertTrue("Test namespace descriptor should have been called",
cp.wasGetNamespaceDescriptorCalled());
cp.enableBypass(false);
cp.resetStates();
// delete table
admin.modifyNamespace(NamespaceDescriptor.create(testNamespace).build());
assertTrue("Test namespace should have been modified", cp.wasModifyNamespaceCalled());
admin.deleteNamespace(testNamespace);
assertTrue("Test namespace should have been deleted", cp.wasDeleteNamespaceCalled());
cp.enableBypass(true);
cp.resetStates();
expected = false;
try {
admin.createNamespace(NamespaceDescriptor.create(testNamespace).build());
} catch (BypassCoprocessorException ce) {
expected = true;
}
assertTrue(expected);
assertTrue("Test namespace should not be created", cp.preCreateNamespaceCalledOnly());
// turn on bypass, run the test
cp.enableBypass(true);
cp.resetStates();
admin.listNamespaceDescriptors();
assertTrue("post listNamespace should not have been called",
cp.preListNamespaceDescriptorsCalledOnly());
// turn off bypass, run the tests again
cp.enableBypass(false);
cp.resetStates();
admin.listNamespaceDescriptors();
assertTrue("post listNamespace should have been called",
cp.wasListNamespaceDescriptorsCalled());
} }
private void modifyTableSync(Admin admin, TableName tableName, HTableDescriptor htd) private void modifyTableSync(Admin admin, TableName tableName, HTableDescriptor htd)
@ -1659,7 +1506,6 @@ public class TestMasterObserver {
HMaster master = cluster.getMaster(); HMaster master = cluster.getMaster();
MasterCoprocessorHost host = master.getMasterCoprocessorHost(); MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class); CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.enableBypass(false);
cp.resetStates(); cp.resetStates();
Table table = UTIL.createMultiRegionTable(tableName, TEST_FAMILY); Table table = UTIL.createMultiRegionTable(tableName, TEST_FAMILY);

View File

@ -714,6 +714,9 @@ public class TestMobCompactor {
while (fileList.length != num) { while (fileList.length != num) {
Thread.sleep(50); Thread.sleep(50);
fileList = fs.listStatus(path); fileList = fs.listStatus(path);
for (FileStatus fileStatus: fileList) {
LOG.info(fileStatus);
}
} }
} }
@ -738,6 +741,7 @@ public class TestMobCompactor {
candidates.remove(0); candidates.remove(0);
} }
c.bypass(); c.bypass();
c.complete();
} }
} }
} }

View File

@ -359,7 +359,7 @@ public class TestHRegion {
/** /**
* Create a WAL outside of the usual helper in * Create a WAL outside of the usual helper in
* {@link HBaseTestingUtility#createWal(Configuration, Path, HRegionInfo)} because that method * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
* doesn't play nicely with FaultyFileSystem. Call this method before overriding * doesn't play nicely with FaultyFileSystem. Call this method before overriding
* {@code fs.file.impl}. * {@code fs.file.impl}.
* @param callingMethod a unique component for the path, probably the name of the test method. * @param callingMethod a unique component for the path, probably the name of the test method.
@ -2386,6 +2386,9 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF); FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
// This chunk creation is done throughout the code base. Do we want to move it into core?
// It is missing from this test. W/o it we NPE.
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES); COLUMN_FAMILY_BYTES);
@ -2433,17 +2436,17 @@ public class TestHRegion {
// save normalCPHost and replaced by mockedCPHost // save normalCPHost and replaced by mockedCPHost
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
Answer<Boolean> answer = new Answer<Boolean>() { // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
// do below format (from Mockito doc).
Mockito.doAnswer(new Answer() {
@Override @Override
public Boolean answer(InvocationOnMock invocation) throws Throwable { public Object answer(InvocationOnMock invocation) throws Throwable {
MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0, MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0,
MiniBatchOperationInProgress.class); MiniBatchOperationInProgress.class);
mb.addOperationsFromCP(0, new Mutation[]{addPut}); mb.addOperationsFromCP(0, new Mutation[]{addPut});
return false; return null;
} }
}; }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)))
.then(answer);
region.setCoprocessorHost(mockedCPHost); region.setCoprocessorHost(mockedCPHost);
region.put(originalPut); region.put(originalPut);
region.setCoprocessorHost(normalCPHost); region.setCoprocessorHost(normalCPHost);