HBASE-18770 Remove bypass method in ObserverContext and implement the
'bypass' logic case by case Changes Coprocessor ObserverContext 'bypass' semantic. We flip the default so bypass is NOT supported on Observer invocations; only a couple of preXXX methods in RegionObserver allow it: e.g. preGet and prePut but not preFlush, etc. Everywhere else, we throw a DoesNotSupportBypassException if a Coprocessor Observer tries to invoke bypass. Master Observers can no longer stop or change move, split, assign, create table, etc. Ditto on complete, the mechanism that allowed a Coprocessor rule that all subsequent Coprocessors are skipped in an invocation chain; now, complete is only available to bypassable methods (and Coprocessors will get an exception if they try to 'complete' when it is not allowed). See javadoc for whether a Coprocessor Observer method supports 'bypass'. If no mention, 'bypass' is NOT supported. M hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Added passing of 'bypassable' (and 'completable') and default 'result' argument to the Operation constructors rather than pass the excecution engine as parameters. Makes it so can clean up RegionObserverHost and make the calling clearer regards what is going on. Methods that support 'bypass' must set this flag on the Observer. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Refactoring in here is minor. A few methods that used support bypass no longer do so removed the check and the need of an if/else meant a left-shift in some code. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java Ditto M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java In here label explicitly those methods that are bypassable. Some changes to make sure we call the corresponding execOperation. TestMasterObserver had a bunch of test of bypass method. All removed or disabled. TODO: What to do w/ the Scanner methods.
This commit is contained in:
parent
63ad16af0c
commit
8237fdbd1b
|
@ -1,42 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Thrown if a coprocessor rules we should bypass an operation
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class BypassCoprocessorException extends CoprocessorException {
|
||||
private static final long serialVersionUID = 5943889011582357043L;
|
||||
|
||||
/** Default Constructor */
|
||||
public BypassCoprocessorException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs the exception and supplies a string as the message
|
||||
* @param s - message
|
||||
*/
|
||||
public BypassCoprocessorException(String s) {
|
||||
super(s);
|
||||
}
|
||||
}
|
|
@ -407,9 +407,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
s.close();
|
||||
return;
|
||||
}
|
||||
if (region.getCoprocessorHost().preScannerClose(s)) {
|
||||
return;
|
||||
}
|
||||
region.getCoprocessorHost().preScannerClose(s);
|
||||
try {
|
||||
s.close();
|
||||
} finally {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -36,14 +35,12 @@ import java.util.function.Function;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -548,11 +545,20 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
ObserverGetter<C, O> observerGetter;
|
||||
|
||||
ObserverOperation(ObserverGetter<C, O> observerGetter) {
|
||||
this(observerGetter, RpcServer.getRequestUser().orElse(null));
|
||||
this(observerGetter, null);
|
||||
}
|
||||
|
||||
ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
|
||||
super(user);
|
||||
this(observerGetter, user, false);
|
||||
}
|
||||
|
||||
ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
|
||||
this(observerGetter, null, bypassable);
|
||||
}
|
||||
|
||||
ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
|
||||
super(user != null? user: RpcServer.getRequestUser().orElse(null),
|
||||
bypassable, bypassable/*'completable': make completable same as bypassable*/);
|
||||
this.observerGetter = observerGetter;
|
||||
}
|
||||
|
||||
|
@ -574,6 +580,11 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
super(observerGetter, user);
|
||||
}
|
||||
|
||||
public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
|
||||
boolean bypassable) {
|
||||
super(observerGetter, user, bypassable);
|
||||
}
|
||||
|
||||
/**
|
||||
* In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
|
||||
* has BulkLoadObserver, RegionObserver, etc), some implementations may not need all
|
||||
|
@ -594,15 +605,23 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
|
||||
private R result;
|
||||
|
||||
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter) {
|
||||
super(observerGetter);
|
||||
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
|
||||
this(observerGetter, result, false);
|
||||
}
|
||||
|
||||
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, User user) {
|
||||
super(observerGetter, user);
|
||||
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
|
||||
boolean bypassable) {
|
||||
this(observerGetter, result, null, bypassable);
|
||||
}
|
||||
|
||||
void setResult(final R result) {
|
||||
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
|
||||
User user) {
|
||||
this(observerGetter, result, user, false);
|
||||
}
|
||||
|
||||
private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
|
||||
boolean bypassable) {
|
||||
super(observerGetter, user, bypassable);
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
|
@ -621,38 +640,27 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
//////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Functions to execute observer hooks and handle results (if any)
|
||||
//////////////////////////////////////////////////////////////////////////////////////////
|
||||
protected <O, R> R execOperationWithResult(final R defaultValue,
|
||||
|
||||
/**
|
||||
* Do not call with an observerOperation that is null! Have the caller check.
|
||||
*/
|
||||
protected <O, R> R execOperationWithResult(
|
||||
final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
|
||||
if (observerOperation == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
observerOperation.setResult(defaultValue);
|
||||
execOperation(observerOperation);
|
||||
return observerOperation.getResult();
|
||||
}
|
||||
|
||||
// what does bypass mean?
|
||||
protected <O, R> R execOperationWithResult(final boolean ifBypass, final R defaultValue,
|
||||
final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
|
||||
if (observerOperation == null) {
|
||||
return ifBypass ? null : defaultValue;
|
||||
} else {
|
||||
observerOperation.setResult(defaultValue);
|
||||
boolean bypass = execOperation(true, observerOperation);
|
||||
R result = observerOperation.getResult();
|
||||
return bypass == ifBypass ? result : null;
|
||||
}
|
||||
boolean bypass = execOperation(observerOperation);
|
||||
R result = observerOperation.getResult();
|
||||
return bypass == observerOperation.isBypassable()? result: null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if we are to bypass (Can only be <code>true</code> if
|
||||
* ObserverOperation#isBypassable().
|
||||
*/
|
||||
protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
|
||||
throws IOException {
|
||||
return execOperation(true, observerOperation);
|
||||
}
|
||||
|
||||
protected <O> boolean execOperation(final boolean earlyExit,
|
||||
final ObserverOperation<O> observerOperation) throws IOException {
|
||||
if (observerOperation == null) return false;
|
||||
boolean bypass = false;
|
||||
if (observerOperation == null) {
|
||||
return bypass;
|
||||
}
|
||||
List<E> envs = coprocEnvironments.get();
|
||||
for (E env : envs) {
|
||||
observerOperation.prepare(env);
|
||||
|
@ -666,8 +674,10 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
} finally {
|
||||
currentThread.setContextClassLoader(cl);
|
||||
}
|
||||
// Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
|
||||
bypass |= observerOperation.shouldBypass();
|
||||
if (earlyExit && observerOperation.shouldComplete()) {
|
||||
// Internal to shouldComplete, it checks if obeserverOperation#isCompletable().
|
||||
if (observerOperation.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
observerOperation.postEnvCall();
|
||||
|
@ -675,7 +685,6 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
return bypass;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Coprocessor classes can be configured in any order, based on that priority is set and
|
||||
* chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is
|
||||
|
@ -719,5 +728,4 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -73,7 +73,6 @@ public interface MasterObserver {
|
|||
* Called before a new table is created by
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create
|
||||
* table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param desc the TableDescriptor for the table
|
||||
* @param regions the initial regions created for the table
|
||||
|
@ -95,7 +94,6 @@ public interface MasterObserver {
|
|||
* Called before a new table is created by
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create
|
||||
* table procedure and it is async to the create RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param desc the TableDescriptor for the table
|
||||
|
@ -123,7 +121,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
|
||||
* table. Called as part of delete table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
*/
|
||||
|
@ -143,7 +140,6 @@ public interface MasterObserver {
|
|||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
|
||||
* table. Called as part of delete table procedure and
|
||||
* it is async to the delete RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -156,7 +152,6 @@ public interface MasterObserver {
|
|||
* Called after {@link org.apache.hadoop.hbase.master.HMaster} deletes a
|
||||
* table. Called as part of delete table procedure and it is async to the
|
||||
* delete RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -168,7 +163,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a
|
||||
* table. Called as part of truncate table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
*/
|
||||
|
@ -190,7 +184,6 @@ public interface MasterObserver {
|
|||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} truncates a
|
||||
* table. Called as part of truncate table procedure and it is async
|
||||
* to the truncate RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -203,7 +196,6 @@ public interface MasterObserver {
|
|||
* Called after {@link org.apache.hadoop.hbase.master.HMaster} truncates a
|
||||
* table. Called as part of truncate table procedure and it is async to the
|
||||
* truncate RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -215,7 +207,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to modifying a table's properties. Called as part of modify
|
||||
* table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
* @param htd the TableDescriptor
|
||||
|
@ -236,7 +227,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to modifying a table's properties. Called as part of modify
|
||||
* table procedure and it is async to the modify table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -250,7 +240,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called after to modifying a table's properties. Called as part of modify
|
||||
* table procedure and it is async to the modify table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -263,7 +252,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called prior to enabling a table. Called as part of enable table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
*/
|
||||
|
@ -282,7 +270,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to enabling a table. Called as part of enable table procedure
|
||||
* and it is async to the enable table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -305,7 +292,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to disabling a table. Called as part of disable table RPC
|
||||
* call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
*/
|
||||
|
@ -324,7 +310,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to disabling a table. Called as part of disable table procedure
|
||||
* and it is asyn to the disable table RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
*
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableName the name of the table
|
||||
|
@ -448,8 +433,7 @@ public interface MasterObserver {
|
|||
final RegionInfo regionInfo, final boolean force) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called prior to marking a given region as offline. <code>ctx.bypass()</code> will not have any
|
||||
* impact on this hook.
|
||||
* Called prior to marking a given region as offline.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param regionInfo
|
||||
*/
|
||||
|
@ -484,14 +468,13 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called prior to setting split / merge switch
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param newValue the new value submitted in the call
|
||||
* @param switchType type of switch
|
||||
*/
|
||||
default boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
return false;
|
||||
}
|
||||
default void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after setting split / merge switch
|
||||
|
@ -538,8 +521,7 @@ public interface MasterObserver {
|
|||
final RegionInfo regionInfoB) throws IOException {}
|
||||
|
||||
/**
|
||||
* This will be called before update META step as part of split transaction. Calling
|
||||
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the split
|
||||
* This will be called before update META step as part of split transaction.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param splitKey
|
||||
* @param metaEntries
|
||||
|
@ -552,8 +534,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* This will be called after update META step as part of split transaction
|
||||
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
|
||||
* effect in this hook.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
*/
|
||||
default void preSplitRegionAfterMETAAction(
|
||||
|
@ -570,7 +550,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called before the regions merge.
|
||||
* Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} to skip the merge.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
*/
|
||||
default void preMergeRegionsAction(
|
||||
|
@ -587,8 +566,7 @@ public interface MasterObserver {
|
|||
final RegionInfo mergedRegion) throws IOException {}
|
||||
|
||||
/**
|
||||
* This will be called before update META step as part of regions merge transaction. Calling
|
||||
* {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge
|
||||
* This will be called before update META step as part of regions merge transaction.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates.
|
||||
* Any puts or deletes to execute on hbase:meta can be added to the mutations.
|
||||
|
@ -618,12 +596,9 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called prior to modifying the flag used to enable/disable region balancing.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param newValue the new flag value submitted in the call
|
||||
*/
|
||||
default boolean preBalanceSwitch(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue) throws IOException {
|
||||
return newValue;
|
||||
}
|
||||
default void preBalanceSwitch(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after the flag to enable/disable balancing has changed.
|
||||
|
@ -667,7 +642,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a new snapshot is taken.
|
||||
* Called as part of snapshot RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor for the snapshot
|
||||
* @param tableDescriptor the TableDescriptor of the table to snapshot
|
||||
|
@ -689,7 +663,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called before listSnapshots request has been processed.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor of the snapshot to list
|
||||
*/
|
||||
|
@ -698,7 +671,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called after listSnapshots request has been processed.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor of the snapshot to list
|
||||
*/
|
||||
|
@ -708,7 +680,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a snapshot is cloned.
|
||||
* Called as part of restoreSnapshot RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor for the snapshot
|
||||
* @param tableDescriptor the TableDescriptor of the table to create
|
||||
|
@ -731,7 +702,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a snapshot is restored.
|
||||
* Called as part of restoreSnapshot RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor for the snapshot
|
||||
* @param tableDescriptor the TableDescriptor of the table to restore
|
||||
|
@ -754,7 +724,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a snapshot is deleted.
|
||||
* Called as part of deleteSnapshot RPC call.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor of the snapshot to delete
|
||||
*/
|
||||
|
@ -774,7 +743,7 @@ public interface MasterObserver {
|
|||
* Called before a getTableDescriptors request has been processed.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param tableNamesList the list of table names, or null if querying for all
|
||||
* @param descriptors an empty list, can be filled with what to return if bypassing
|
||||
* @param descriptors an empty list, can be filled with what to return in coprocessor
|
||||
* @param regex regular expression used for filtering the table names
|
||||
*/
|
||||
default void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
|
@ -795,7 +764,7 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a getTableNames request has been processed.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param descriptors an empty list, can be filled with what to return if bypassing
|
||||
* @param descriptors an empty list, can be filled with what to return by coprocessor
|
||||
* @param regex regular expression used for filtering the table names
|
||||
*/
|
||||
default void preGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
|
@ -815,7 +784,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a new namespace is created by
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster}.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param ns the NamespaceDescriptor for the table
|
||||
*/
|
||||
|
@ -832,7 +800,6 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
|
||||
* namespace
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param namespace the name of the namespace
|
||||
*/
|
||||
|
@ -849,7 +816,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called prior to modifying a namespace's properties.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param ns the NamespaceDescriptor
|
||||
*/
|
||||
|
@ -883,7 +849,7 @@ public interface MasterObserver {
|
|||
/**
|
||||
* Called before a listNamespaceDescriptors request has been processed.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param descriptors an empty list, can be filled with what to return if bypassing
|
||||
* @param descriptors an empty list, can be filled with what to return by coprocessor
|
||||
*/
|
||||
default void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
List<NamespaceDescriptor> descriptors) throws IOException {}
|
||||
|
@ -1013,7 +979,6 @@ public interface MasterObserver {
|
|||
|
||||
/**
|
||||
* Called before merge regions request.
|
||||
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
|
||||
* @param ctx coprocessor environment
|
||||
* @param regionsToMerge regions to be merged
|
||||
*/
|
||||
|
|
|
@ -28,10 +28,10 @@ import java.util.Optional;
|
|||
/**
|
||||
* Carries the execution state for a given invocation of an Observer coprocessor
|
||||
* ({@link RegionObserver}, {@link MasterObserver}, or {@link WALObserver})
|
||||
* method. The same ObserverContext instance is passed sequentially to all loaded
|
||||
* method. The same ObserverContext instance is passed sequentially to all loaded
|
||||
* coprocessors for a given Observer method trigger, with the
|
||||
* <code>CoprocessorEnvironment</code> reference swapped out for each
|
||||
* coprocessor.
|
||||
* <code>CoprocessorEnvironment</code> reference set appropriately for each Coprocessor type:
|
||||
* e.g. the RegionCoprocessorEnvironment is passed to RegionCoprocessors, and so on.
|
||||
* @param <E> The {@link CoprocessorEnvironment} subclass applicable to the
|
||||
* revelant Observer interface.
|
||||
*/
|
||||
|
@ -41,15 +41,39 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
|
|||
E getEnvironment();
|
||||
|
||||
/**
|
||||
* Call to indicate that the current coprocessor's return value should be
|
||||
* used in place of the normal HBase obtained value.
|
||||
* Call to indicate that the current coprocessor's return value (or parameter -- depends on the
|
||||
* call-type) should be used in place of the value that would be obtained via normal processing;
|
||||
* i.e. bypass the core call and return the Coprocessor's result instead. DOES NOT work for all
|
||||
* Coprocessor invocations, only on a small subset of methods, mostly preXXX calls in
|
||||
* RegionObserver. Check javadoc on the pertinent Coprocessor Observer to see if
|
||||
* <code>bypass</code> is supported.
|
||||
* <p>This behavior of honoring only a subset of methods is new since hbase-2.0.0.
|
||||
* <p>Where bypass is supported what is being bypassed is all of the core code
|
||||
* implementing the remainder of the operation. In order to understand what
|
||||
* calling bypass() will skip, a coprocessor implementer should read and
|
||||
* understand all of the remaining code and its nuances. Although this
|
||||
* is good practice for coprocessor developers in general, it demands a lot.
|
||||
* What is skipped is extremely version dependent. The core code will vary, perhaps significantly,
|
||||
* even between point releases. We do not provide the promise of consistent behavior even between
|
||||
* point releases for the bypass semantic. To achieve
|
||||
* that we could not change any code between hook points. Therefore the
|
||||
* coprocessor implementer becomes an HBase core developer in practice as soon
|
||||
* as they rely on bypass(). Every release of HBase may break the assumption
|
||||
* that the replacement for the bypassed code takes care of all necessary
|
||||
* skipped concerns. Because those concerns can change at any point, such an
|
||||
* assumption is never safe.</p>
|
||||
* @see #complete()
|
||||
*/
|
||||
void bypass();
|
||||
|
||||
/**
|
||||
* Call to indicate that additional coprocessors further down the execution
|
||||
* chain do not need to be invoked. Implies that this coprocessor's response
|
||||
* Call to skip out on calling remaining coprocessors in current execution chain (there may be
|
||||
* more than one coprocessor chained to a method call). Implies that this coprocessor's response
|
||||
* is definitive.
|
||||
* <p>Since hbase-2.0.0, only <code>complete</code> of 'bypassable' methods has an effect. See
|
||||
* javadoc on the Coprocessor Observer method as to whether bypass (and thereby 'complete') is
|
||||
* supported. This behavior of honoring only a subset of methods is new since hbase-2.0.0.
|
||||
* @see #bypass()
|
||||
*/
|
||||
void complete();
|
||||
|
||||
|
@ -60,5 +84,4 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
|
|||
* context.
|
||||
*/
|
||||
Optional<User> getCaller();
|
||||
|
||||
}
|
||||
|
|
|
@ -35,11 +35,25 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
public class ObserverContextImpl<E extends CoprocessorEnvironment> implements ObserverContext<E> {
|
||||
private E env;
|
||||
private boolean bypass;
|
||||
/**
|
||||
* Is this operation bypassable?
|
||||
*/
|
||||
private final boolean bypassable;
|
||||
/**
|
||||
* Is this operation completable?
|
||||
*/
|
||||
private boolean complete;
|
||||
private final boolean completable;
|
||||
private final User caller;
|
||||
|
||||
public ObserverContextImpl(User caller) {
|
||||
this(caller, false, false);
|
||||
}
|
||||
|
||||
public ObserverContextImpl(User caller, boolean bypassable, boolean completable) {
|
||||
this.caller = caller;
|
||||
this.bypassable = bypassable;
|
||||
this.completable = completable;
|
||||
}
|
||||
|
||||
public E getEnvironment() {
|
||||
|
@ -50,11 +64,25 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
this.env = env;
|
||||
}
|
||||
|
||||
public boolean isBypassable() {
|
||||
return this.bypassable;
|
||||
};
|
||||
|
||||
public void bypass() {
|
||||
if (!this.bypassable) {
|
||||
throw new UnsupportedOperationException("This method does not support 'bypass'.");
|
||||
}
|
||||
bypass = true;
|
||||
}
|
||||
|
||||
public boolean isCompleable() {
|
||||
return this.completable;
|
||||
};
|
||||
|
||||
public void complete() {
|
||||
if (!this.completable) {
|
||||
throw new UnsupportedOperationException("This method does not support 'complete'.");
|
||||
}
|
||||
complete = true;
|
||||
}
|
||||
|
||||
|
@ -63,6 +91,9 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
* coprocessors, {@code false} otherwise.
|
||||
*/
|
||||
public boolean shouldBypass() {
|
||||
if (!isBypassable()) {
|
||||
return false;
|
||||
}
|
||||
if (bypass) {
|
||||
bypass = false;
|
||||
return true;
|
||||
|
@ -75,6 +106,9 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
* coprocessors, {@code false} otherwise.
|
||||
*/
|
||||
public boolean shouldComplete() {
|
||||
if (!isCompleable()) {
|
||||
return false;
|
||||
}
|
||||
if (complete) {
|
||||
complete = false;
|
||||
return true;
|
||||
|
|
|
@ -177,6 +177,8 @@ public interface RegionObserver {
|
|||
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
|
||||
* available candidates. To alter the files used for compaction, you may mutate the passed in list
|
||||
* of candidates. If you remove all the candidates then the compaction will be canceled.
|
||||
* <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
|
||||
* the passed in <code>candidates</code>.
|
||||
* @param c the environment provided by the region server
|
||||
* @param store the store where compaction is being requested
|
||||
* @param candidates the store files currently available for compaction
|
||||
|
@ -390,7 +392,10 @@ public interface RegionObserver {
|
|||
* @param byteNow - timestamp bytes
|
||||
* @param get - the get formed using the current cell's row. Note that the get does not specify
|
||||
* the family and qualifier
|
||||
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
|
||||
* with something that doesn't expose IntefaceAudience.Private classes.
|
||||
*/
|
||||
@Deprecated
|
||||
default void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Mutation mutation, Cell cell, byte[] byteNow, Get get) throws IOException {}
|
||||
|
||||
|
@ -435,8 +440,10 @@ public interface RegionObserver {
|
|||
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param miniBatchOp batch of Mutations applied to region.
|
||||
* @param miniBatchOp batch of Mutations applied to region. Coprocessors are discouraged from
|
||||
* manipulating its state.
|
||||
*/
|
||||
// Coprocessors can do a form of bypass by changing state in miniBatchOp.
|
||||
default void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {}
|
||||
|
||||
|
@ -926,6 +933,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before a {@link WALEdit}
|
||||
* replayed for this region.
|
||||
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
|
||||
* damage.
|
||||
* @param ctx the environment provided by the region server
|
||||
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
|
||||
* with something that doesn't expose IntefaceAudience.Private classes.
|
||||
|
@ -937,6 +946,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called after a {@link WALEdit}
|
||||
* replayed for this region.
|
||||
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
|
||||
* damage.
|
||||
* @param ctx the environment provided by the region server
|
||||
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
|
||||
* with something that doesn't expose IntefaceAudience.Private classes.
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -70,21 +69,20 @@ public interface WALObserver {
|
|||
/**
|
||||
* Called before a {@link WALEdit}
|
||||
* is writen to WAL.
|
||||
*
|
||||
* @return true if default behavior should be bypassed, false otherwise
|
||||
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
|
||||
* damage.
|
||||
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
|
||||
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
|
||||
*/
|
||||
// TODO: return value is not used
|
||||
@Deprecated
|
||||
default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
|
||||
return false;
|
||||
}
|
||||
default void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
|
||||
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after a {@link WALEdit}
|
||||
* is writen to WAL.
|
||||
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
|
||||
* damage.
|
||||
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
|
||||
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
|
||||
*/
|
||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
@ -83,7 +82,6 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
||||
|
@ -1704,9 +1702,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
try {
|
||||
checkInitialized();
|
||||
if (this.cpHost != null) {
|
||||
if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
|
||||
return;
|
||||
}
|
||||
this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
|
||||
}
|
||||
// Warmup the region on the destination before initiating the move. this call
|
||||
// is synchronous and takes some time. doing it before the source region gets
|
||||
|
@ -2895,13 +2891,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
|
||||
|
||||
return MasterProcedureUtil.submitProcedure(
|
||||
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
|
||||
return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
|
||||
nonceGroup, nonce) {
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
if (getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor)) {
|
||||
throw new BypassCoprocessorException();
|
||||
}
|
||||
getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
|
||||
LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
|
||||
// Execute the operation synchronously - wait for the operation to complete before
|
||||
// continuing.
|
||||
|
@ -2929,13 +2923,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
|
||||
|
||||
return MasterProcedureUtil.submitProcedure(
|
||||
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
|
||||
return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
|
||||
nonceGroup, nonce) {
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
if (getMaster().getMasterCoprocessorHost().preModifyNamespace(namespaceDescriptor)) {
|
||||
throw new BypassCoprocessorException();
|
||||
}
|
||||
getMaster().getMasterCoprocessorHost().preModifyNamespace(namespaceDescriptor);
|
||||
LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
|
||||
// Execute the operation synchronously - wait for the operation to complete before
|
||||
// continuing.
|
||||
|
@ -2961,13 +2953,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
throws IOException {
|
||||
checkInitialized();
|
||||
|
||||
return MasterProcedureUtil.submitProcedure(
|
||||
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
|
||||
return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
|
||||
nonceGroup, nonce) {
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
if (getMaster().getMasterCoprocessorHost().preDeleteNamespace(name)) {
|
||||
throw new BypassCoprocessorException();
|
||||
}
|
||||
getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
|
||||
LOG.info(getClientIdAuditPrefix() + " delete " + name);
|
||||
// Execute the operation synchronously - wait for the operation to complete before
|
||||
// continuing.
|
||||
|
@ -3002,13 +2992,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
List<NamespaceDescriptor> getNamespaces() throws IOException {
|
||||
checkInitialized();
|
||||
final List<NamespaceDescriptor> nsds = new ArrayList<>();
|
||||
boolean bypass = false;
|
||||
if (cpHost != null) {
|
||||
bypass = cpHost.preListNamespaceDescriptors(nsds);
|
||||
cpHost.preListNamespaceDescriptors(nsds);
|
||||
}
|
||||
if (!bypass) {
|
||||
nsds.addAll(this.clusterSchemaService.getNamespaces());
|
||||
if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
|
||||
nsds.addAll(this.clusterSchemaService.getNamespaces());
|
||||
if (this.cpHost != null) {
|
||||
this.cpHost.postListNamespaceDescriptors(nsds);
|
||||
}
|
||||
return nsds;
|
||||
}
|
||||
|
@ -3085,13 +3074,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
final List<TableName> tableNameList, final boolean includeSysTables)
|
||||
throws IOException {
|
||||
List<TableDescriptor> htds = new ArrayList<>();
|
||||
boolean bypass = cpHost != null?
|
||||
cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
|
||||
if (!bypass) {
|
||||
htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
|
||||
if (cpHost != null) {
|
||||
cpHost.postGetTableDescriptors(tableNameList, htds, regex);
|
||||
}
|
||||
if (cpHost != null) {
|
||||
cpHost.preGetTableDescriptors(tableNameList, htds, regex);
|
||||
}
|
||||
htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
|
||||
if (cpHost != null) {
|
||||
cpHost.postGetTableDescriptors(tableNameList, htds, regex);
|
||||
}
|
||||
return htds;
|
||||
}
|
||||
|
@ -3106,10 +3094,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
public List<TableName> listTableNames(final String namespace, final String regex,
|
||||
final boolean includeSysTables) throws IOException {
|
||||
List<TableDescriptor> htds = new ArrayList<>();
|
||||
boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
|
||||
if (!bypass) {
|
||||
htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
|
||||
if (cpHost != null) cpHost.postGetTableNames(htds, regex);
|
||||
if (cpHost != null) {
|
||||
cpHost.preGetTableNames(htds, regex);
|
||||
}
|
||||
htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
|
||||
if (cpHost != null) {
|
||||
cpHost.postGetTableNames(htds, regex);
|
||||
}
|
||||
List<TableName> result = new ArrayList<>(htds.size());
|
||||
for (TableDescriptor htd: htds) result.add(htd.getTableName());
|
||||
|
|
|
@ -192,9 +192,17 @@ public class MasterCoprocessorHost
|
|||
super(masterObserverGetter);
|
||||
}
|
||||
|
||||
public MasterObserverOperation(boolean bypassable) {
|
||||
this(null, bypassable);
|
||||
}
|
||||
|
||||
public MasterObserverOperation(User user) {
|
||||
super(masterObserverGetter, user);
|
||||
}
|
||||
|
||||
public MasterObserverOperation(User user, boolean bypassable) {
|
||||
super(masterObserverGetter, user, bypassable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -203,8 +211,8 @@ public class MasterCoprocessorHost
|
|||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preCreateNamespace(final NamespaceDescriptor ns) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preCreateNamespace(this, ns);
|
||||
|
@ -221,8 +229,8 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preDeleteNamespace(final String namespaceName) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preDeleteNamespace(final String namespaceName) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preDeleteNamespace(this, namespaceName);
|
||||
|
@ -239,8 +247,8 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preModifyNamespace(final NamespaceDescriptor ns) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preModifyNamespace(final NamespaceDescriptor ns) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preModifyNamespace(this, ns);
|
||||
|
@ -277,9 +285,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preListNamespaceDescriptors(final List<NamespaceDescriptor> descriptors)
|
||||
public void preListNamespaceDescriptors(final List<NamespaceDescriptor> descriptors)
|
||||
throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preListNamespaceDescriptors(this, descriptors);
|
||||
|
@ -528,10 +536,10 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preAbortProcedure(
|
||||
public void preAbortProcedure(
|
||||
final ProcedureExecutor<MasterProcedureEnv> procEnv,
|
||||
final long procId) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preAbortProcedure(this, procId);
|
||||
|
@ -548,8 +556,8 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preGetProcedures() throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preGetProcedures() throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preGetProcedures(this);
|
||||
|
@ -566,8 +574,8 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preGetLocks() throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preGetLocks() throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preGetLocks(this);
|
||||
|
@ -584,9 +592,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preMove(final RegionInfo region, final ServerName srcServer,
|
||||
public void preMove(final RegionInfo region, final ServerName srcServer,
|
||||
final ServerName destServer) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preMove(this, region, srcServer, destServer);
|
||||
|
@ -604,8 +612,8 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preAssign(final RegionInfo regionInfo) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
public void preAssign(final RegionInfo regionInfo) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preAssign(this, regionInfo);
|
||||
|
@ -622,9 +630,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preUnassign(final RegionInfo regionInfo, final boolean force)
|
||||
public void preUnassign(final RegionInfo regionInfo, final boolean force)
|
||||
throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preUnassign(this, regionInfo, force);
|
||||
|
@ -697,9 +705,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preSetSplitOrMergeEnabled(final boolean newValue,
|
||||
public void preSetSplitOrMergeEnabled(final boolean newValue,
|
||||
final MasterSwitchType switchType) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty()? null: new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preSetSplitOrMergeEnabled(this, newValue, switchType);
|
||||
|
@ -779,11 +787,11 @@ public class MasterCoprocessorHost
|
|||
* @param user the user
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean preSplitBeforeMETAAction(
|
||||
public void preSplitBeforeMETAAction(
|
||||
final byte[] splitKey,
|
||||
final List<Mutation> metaEntries,
|
||||
final User user) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preSplitRegionBeforeMETAAction(this, splitKey, metaEntries);
|
||||
|
@ -825,9 +833,9 @@ public class MasterCoprocessorHost
|
|||
* @param user the user
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean preMergeRegionsAction(
|
||||
public void preMergeRegionsAction(
|
||||
final RegionInfo[] regionsToMerge, final User user) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preMergeRegionsAction(this, regionsToMerge);
|
||||
|
@ -861,11 +869,11 @@ public class MasterCoprocessorHost
|
|||
* @param user the user
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean preMergeRegionsCommit(
|
||||
public void preMergeRegionsCommit(
|
||||
final RegionInfo[] regionsToMerge,
|
||||
final @MetaMutationAnnotation List<Mutation> metaEntries,
|
||||
final User user) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries);
|
||||
|
@ -908,14 +916,17 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preBalanceSwitch(final boolean b) throws IOException {
|
||||
return execOperationWithResult(b, coprocEnvironments.isEmpty() ? null :
|
||||
new ObserverOperationWithResult<MasterObserver, Boolean>(masterObserverGetter) {
|
||||
@Override
|
||||
public Boolean call(MasterObserver observer) throws IOException {
|
||||
return observer.preBalanceSwitch(this, getResult());
|
||||
}
|
||||
});
|
||||
// This hook allows Coprocessor change value of balance switch.
|
||||
public void preBalanceSwitch(final boolean b) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
execOperation(new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preBalanceSwitch(this, b);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postBalanceSwitch(final boolean oldValue, final boolean newValue)
|
||||
|
@ -931,7 +942,10 @@ public class MasterCoprocessorHost
|
|||
public void preShutdown() throws IOException {
|
||||
// While stopping the cluster all coprocessors method should be executed first then the
|
||||
// coprocessor should be cleaned up.
|
||||
execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
execShutdown(new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preShutdown(this);
|
||||
|
@ -947,7 +961,10 @@ public class MasterCoprocessorHost
|
|||
public void preStopMaster() throws IOException {
|
||||
// While stopping master all coprocessors method should be executed first then the coprocessor
|
||||
// environment should be cleaned up.
|
||||
execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
execShutdown(new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preStopMaster(this);
|
||||
|
@ -1074,9 +1091,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preGetTableDescriptors(final List<TableName> tableNamesList,
|
||||
public void preGetTableDescriptors(final List<TableName> tableNamesList,
|
||||
final List<TableDescriptor> descriptors, final String regex) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preGetTableDescriptors(this, tableNamesList, descriptors, regex);
|
||||
|
@ -1094,9 +1111,9 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public boolean preGetTableNames(final List<TableDescriptor> descriptors,
|
||||
public void preGetTableNames(final List<TableDescriptor> descriptors,
|
||||
final String regex) throws IOException {
|
||||
return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preGetTableNames(this, descriptors, regex);
|
||||
|
|
|
@ -331,7 +331,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
boolean newValue = b;
|
||||
try {
|
||||
if (master.cpHost != null) {
|
||||
newValue = master.cpHost.preBalanceSwitch(newValue);
|
||||
master.cpHost.preBalanceSwitch(newValue);
|
||||
}
|
||||
try {
|
||||
if (mode == BalanceSwitchMode.SYNC) {
|
||||
|
@ -509,9 +509,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
|
||||
if (master.cpHost != null) {
|
||||
if (master.cpHost.preAssign(regionInfo)) {
|
||||
return arr;
|
||||
}
|
||||
master.cpHost.preAssign(regionInfo);
|
||||
}
|
||||
LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
|
||||
master.getAssignmentManager().assign(regionInfo, true);
|
||||
|
@ -1517,9 +1515,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
RegionInfo hri = pair.getFirst();
|
||||
if (master.cpHost != null) {
|
||||
if (master.cpHost.preUnassign(hri, force)) {
|
||||
return urr;
|
||||
}
|
||||
master.cpHost.preUnassign(hri, force);
|
||||
}
|
||||
LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
|
||||
+ " in current location if it is online and reassign.force=" + force);
|
||||
|
@ -1704,13 +1700,10 @@ public class MasterRpcServices extends RSRpcServices
|
|||
MasterSwitchType switchType = convert(masterSwitchType);
|
||||
boolean oldValue = master.isSplitOrMergeEnabled(switchType);
|
||||
response.addPrevValue(oldValue);
|
||||
boolean bypass = false;
|
||||
if (master.cpHost != null) {
|
||||
bypass = master.cpHost.preSetSplitOrMergeEnabled(newValue, switchType);
|
||||
}
|
||||
if (!bypass) {
|
||||
master.getSplitOrMergeTracker().setSplitOrMergeEnabled(newValue, switchType);
|
||||
master.cpHost.preSetSplitOrMergeEnabled(newValue, switchType);
|
||||
}
|
||||
master.getSplitOrMergeTracker().setSplitOrMergeEnabled(newValue, switchType);
|
||||
if (master.cpHost != null) {
|
||||
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
|
||||
}
|
||||
|
@ -2155,18 +2148,10 @@ public class MasterRpcServices extends RSRpcServices
|
|||
ListDeadServersResponse.Builder response = ListDeadServersResponse.newBuilder();
|
||||
try {
|
||||
master.checkInitialized();
|
||||
if (master.cpHost != null) {
|
||||
master.cpHost.preListDeadServers();
|
||||
}
|
||||
|
||||
Set<ServerName> servers = master.getServerManager().getDeadServers().copyServerNames();
|
||||
for (ServerName server : servers) {
|
||||
response.addServerName(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
|
||||
if (master.cpHost != null) {
|
||||
master.cpHost.postListDeadServers();
|
||||
}
|
||||
} catch (IOException io) {
|
||||
throw new ServiceException(io);
|
||||
}
|
||||
|
|
|
@ -530,12 +530,7 @@ public class MergeTableRegionsProcedure
|
|||
private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
|
||||
if (ret) {
|
||||
throw new IOException(
|
||||
"Coprocessor bypassing regions " + RegionInfo.getShortNameToLog(regionsToMerge) +
|
||||
" merge.");
|
||||
}
|
||||
cpHost.preMergeRegionsAction(regionsToMerge, getUser());
|
||||
}
|
||||
// TODO: Clean up split and merge. Currently all over the place.
|
||||
try {
|
||||
|
@ -702,13 +697,7 @@ public class MergeTableRegionsProcedure
|
|||
if (cpHost != null) {
|
||||
@MetaMutationAnnotation
|
||||
final List<Mutation> metaEntries = new ArrayList<Mutation>();
|
||||
boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
|
||||
|
||||
if (ret) {
|
||||
throw new IOException(
|
||||
"Coprocessor bypassing regions " + RegionInfo.getShortNameToLog(regionsToMerge) +
|
||||
" merge.");
|
||||
}
|
||||
cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
|
||||
try {
|
||||
for (Mutation p : metaEntries) {
|
||||
RegionInfo.parseRegionName(p.getRow());
|
||||
|
|
|
@ -709,10 +709,7 @@ public class SplitTableRegionProcedure
|
|||
final List<Mutation> metaEntries = new ArrayList<Mutation>();
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
if (cpHost.preSplitBeforeMETAAction(getSplitRow(), metaEntries, getUser())) {
|
||||
throw new IOException("Coprocessor bypassing region " +
|
||||
getParentRegion().getRegionNameAsString() + " split.");
|
||||
}
|
||||
cpHost.preSplitBeforeMETAAction(getSplitRow(), metaEntries, getUser());
|
||||
try {
|
||||
for (Mutation p : metaEntries) {
|
||||
RegionInfo.parseRegionName(p.getRow());
|
||||
|
|
|
@ -2408,7 +2408,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
/**
|
||||
* Flushing all stores.
|
||||
* @see #internalFlushcache(Collection, MonitoredTask, boolean)
|
||||
* @see #internalFlushcache(Collection, MonitoredTask, boolean, FlushLifeCycleTracker)
|
||||
*/
|
||||
private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
|
||||
return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
|
||||
|
@ -2416,7 +2416,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
/**
|
||||
* Flushing given stores.
|
||||
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
|
||||
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean, FlushLifeCycleTracker)
|
||||
*/
|
||||
private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
|
||||
boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
|
||||
|
@ -3280,39 +3280,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @param batchOp
|
||||
*/
|
||||
private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
|
||||
if (coprocessorHost == null) {
|
||||
return;
|
||||
}
|
||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||
WALEdit walEdit = new WALEdit();
|
||||
if (coprocessorHost != null) {
|
||||
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||
Mutation m = batchOp.getMutation(i);
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
|
||||
// pre hook says skip this Put
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
Delete curDel = (Delete) m;
|
||||
if (curDel.getFamilyCellMap().isEmpty()) {
|
||||
// handle deleting a row case
|
||||
prepareDelete(curDel);
|
||||
}
|
||||
if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
|
||||
// pre hook says skip this Delete
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
|
||||
// mark the operation return code as failure so that it will not be considered in
|
||||
// the doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
|
||||
"Put/Delete mutations only supported in batchMutate() now");
|
||||
int noOfPuts = 0;
|
||||
int noOfDeletes = 0;
|
||||
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||
Mutation m = batchOp.getMutation(i);
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
|
||||
// pre hook says skip this Put
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
noOfPuts++;
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
if (!walEdit.isEmpty()) {
|
||||
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
||||
walEdit = new WALEdit();
|
||||
} else if (m instanceof Delete) {
|
||||
Delete curDel = (Delete) m;
|
||||
if (curDel.getFamilyCellMap().isEmpty()) {
|
||||
// handle deleting a row case
|
||||
prepareDelete(curDel);
|
||||
}
|
||||
if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
|
||||
// pre hook says skip this Delete
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
noOfDeletes++;
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
|
||||
// mark the operation return code as failure so that it will not be considered in
|
||||
// the doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
|
||||
"Put/Delete mutations only supported in batchMutate() now");
|
||||
}
|
||||
if (!walEdit.isEmpty()) {
|
||||
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
||||
walEdit = new WALEdit();
|
||||
}
|
||||
}
|
||||
// Update metrics in same way as it is done when we go the normal processing route (we now
|
||||
// update general metrics though a Coprocessor did the work).
|
||||
if (noOfPuts > 0) {
|
||||
// There were some Puts in the batch.
|
||||
if (this.metricsRegion != null) {
|
||||
this.metricsRegion.updatePut();
|
||||
}
|
||||
}
|
||||
if (noOfDeletes > 0) {
|
||||
// There were some Deletes in the batch.
|
||||
if (this.metricsRegion != null) {
|
||||
this.metricsRegion.updateDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3333,7 +3352,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
int firstIndex = batchOp.nextIndexToProcess;
|
||||
int lastIndexExclusive = firstIndex;
|
||||
boolean success = false;
|
||||
boolean doneByCoprocessor = false;
|
||||
int noOfPuts = 0;
|
||||
int noOfDeletes = 0;
|
||||
WriteEntry writeEntry = null;
|
||||
|
@ -3417,43 +3435,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
||||
new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
|
||||
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
|
||||
if (coprocessorHost.preBatchMutate(miniBatchOp)) {
|
||||
doneByCoprocessor = true;
|
||||
return;
|
||||
} else {
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
|
||||
// lastIndexExclusive was incremented above.
|
||||
continue;
|
||||
}
|
||||
// we pass (i - firstIndex) below since the call expects a relative index
|
||||
Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
|
||||
if (cpMutations == null) {
|
||||
continue;
|
||||
}
|
||||
Mutation mutation = batchOp.getMutation(i);
|
||||
boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
|
||||
// Else Coprocessor added more Mutations corresponding to the Mutation at this index.
|
||||
for (int j = 0; j < cpMutations.length; j++) {
|
||||
Mutation cpMutation = cpMutations[j];
|
||||
checkAndPrepareMutation(cpMutation, replay, now);
|
||||
coprocessorHost.preBatchMutate(miniBatchOp);
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
|
||||
// lastIndexExclusive was incremented above.
|
||||
continue;
|
||||
}
|
||||
// we pass (i - firstIndex) below since the call expects a relative index
|
||||
Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
|
||||
if (cpMutations == null) {
|
||||
continue;
|
||||
}
|
||||
Mutation mutation = batchOp.getMutation(i);
|
||||
boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
|
||||
// Else Coprocessor added more Mutations corresponding to the Mutation at this index.
|
||||
for (int j = 0; j < cpMutations.length; j++) {
|
||||
Mutation cpMutation = cpMutations[j];
|
||||
checkAndPrepareMutation(cpMutation, replay, now);
|
||||
|
||||
// Acquire row locks. If not, the whole batch will fail.
|
||||
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
|
||||
// Acquire row locks. If not, the whole batch will fail.
|
||||
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
|
||||
|
||||
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
|
||||
// directly add the cells from those mutations to the familyMaps of this mutation.
|
||||
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
|
||||
// will get added to the memStore later
|
||||
mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
|
||||
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
|
||||
// directly add the cells from those mutations to the familyMaps of this mutation.
|
||||
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
|
||||
// will get added to the memStore later
|
||||
mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
|
||||
|
||||
// The durability of returned mutation is replaced by the corresponding mutation.
|
||||
// If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
|
||||
// cells of returned mutation.
|
||||
if (!skipWal) {
|
||||
for (List<Cell> cells : cpFamilyMap.values()) {
|
||||
cellCount += cells.size();
|
||||
}
|
||||
// The durability of returned mutation is replaced by the corresponding mutation.
|
||||
// If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
|
||||
// cells of returned mutation.
|
||||
if (!skipWal) {
|
||||
for (List<Cell> cells : cpFamilyMap.values()) {
|
||||
cellCount += cells.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3558,13 +3572,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
|
||||
batchOp.retCodeDetails[i] =
|
||||
success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
|
||||
batchOp.retCodeDetails[i] = success? OperationStatus.SUCCESS : OperationStatus.FAILURE;
|
||||
}
|
||||
}
|
||||
|
||||
// synced so that the coprocessor contract is adhered to.
|
||||
if (!replay && coprocessorHost != null && !doneByCoprocessor) {
|
||||
if (!replay && coprocessorHost != null) {
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
// only for successful puts
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode()
|
||||
|
@ -6969,14 +6982,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
|
||||
throws IOException {
|
||||
List<Cell> results = new ArrayList<>();
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
// pre-get CP hook
|
||||
if (withCoprocessor && (coprocessorHost != null)) {
|
||||
if (coprocessorHost.preGet(get, results)) {
|
||||
metricsUpdateForGet(results, before);
|
||||
return results;
|
||||
}
|
||||
}
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
Scan scan = new Scan(get);
|
||||
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
|
||||
scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
|
||||
|
@ -7303,6 +7317,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
try {
|
||||
Result cpResult = doCoprocessorPreCall(op, mutation);
|
||||
if (cpResult != null) {
|
||||
// Metrics updated below in the finally block.
|
||||
return returnResults? cpResult: null;
|
||||
}
|
||||
Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
|
||||
|
|
|
@ -936,7 +936,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
/**
|
||||
* Snapshot this stores memstore. Call before running
|
||||
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
|
||||
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController,
|
||||
* FlushLifeCycleTracker)}
|
||||
* so it has some work to do.
|
||||
*/
|
||||
void snapshot() {
|
||||
|
@ -1670,10 +1671,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
// First, see if coprocessor would want to override selection.
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
|
||||
boolean override = false;
|
||||
//TODO: is it correct way to get CompactionRequest?
|
||||
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
|
||||
tracker, user);
|
||||
boolean override = getCoprocessorHost().preCompactSelection(this,
|
||||
candidatesForCoproc, tracker, user);
|
||||
if (override) {
|
||||
// Coprocessor is overriding normal file selection.
|
||||
compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
|
||||
|
|
|
@ -108,6 +108,8 @@ MultiRowMutationProcessorResponse> {
|
|||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
|
||||
// by pass everything
|
||||
// Is this right? Bypass everything and not just this individual put?
|
||||
// This class is going away in hbase2 so lets not sweat it.
|
||||
return;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
|
@ -115,6 +117,8 @@ MultiRowMutationProcessorResponse> {
|
|||
region.prepareDelete(d);
|
||||
if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
|
||||
// by pass everything
|
||||
// Is this right? Bypass everything and not just this individual put?
|
||||
// This class is going away in hbase2 so lets not sweat it.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -680,7 +680,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
} else {
|
||||
// convert duplicate append to get
|
||||
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
|
||||
nonceGroup, nonce);
|
||||
nonceGroup, nonce);
|
||||
r = Result.create(results);
|
||||
}
|
||||
success = true;
|
||||
|
@ -731,7 +731,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
} else {
|
||||
// convert duplicate increment to get
|
||||
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
|
||||
nonce);
|
||||
nonce);
|
||||
r = Result.create(results);
|
||||
}
|
||||
success = true;
|
||||
|
@ -2251,7 +2251,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
checkOpen();
|
||||
requestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
boolean bypass = false;
|
||||
boolean loaded = false;
|
||||
Map<byte[], List<Path>> map = null;
|
||||
|
||||
|
@ -2278,15 +2277,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||
region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||
}
|
||||
try {
|
||||
if (!bypass) {
|
||||
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
|
||||
request.getCopyFile());
|
||||
if (map != null) {
|
||||
loaded = true;
|
||||
}
|
||||
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
|
||||
request.getCopyFile());
|
||||
if (map != null) {
|
||||
loaded = true;
|
||||
}
|
||||
} finally {
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
|
@ -2457,16 +2454,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
|
||||
RpcCallContext context) throws IOException {
|
||||
region.prepareGet(get);
|
||||
List<Cell> results = new ArrayList<>();
|
||||
boolean stale = region.getRegionInfo().getReplicaId() != 0;
|
||||
|
||||
// This method is almost the same as HRegion#get.
|
||||
List<Cell> results = new ArrayList<>();
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
// pre-get CP hook
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
if (region.getCoprocessorHost().preGet(get, results)) {
|
||||
region.metricsUpdateForGet(results, before);
|
||||
return Result
|
||||
.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
|
||||
}
|
||||
}
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
Scan scan = new Scan(get);
|
||||
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
|
||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
|
@ -2498,6 +2498,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
region.getCoprocessorHost().postGet(get, results);
|
||||
}
|
||||
region.metricsUpdateForGet(results, before);
|
||||
|
||||
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
|
||||
}
|
||||
|
||||
|
@ -2729,11 +2730,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOperator compareOp =
|
||||
CompareOperator.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator =
|
||||
ProtobufUtil.toComparator(condition.getComparator());
|
||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndPut(
|
||||
row, family, qualifier, compareOp, comparator, put);
|
||||
processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
|
||||
compareOp, comparator, put);
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result = region.checkAndMutate(row, family,
|
||||
|
@ -2760,11 +2760,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
byte[] family = condition.getFamily().toByteArray();
|
||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator =
|
||||
ProtobufUtil.toComparator(condition.getComparator());
|
||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndDelete(
|
||||
row, family, qualifier, op, comparator, delete);
|
||||
processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
|
||||
comparator, delete);
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result = region.checkAndMutate(row, family,
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -115,7 +115,10 @@ public class RegionServerCoprocessorHost extends
|
|||
public void preStop(String message, User user) throws IOException {
|
||||
// While stopping the region server all coprocessors method should be executed first then the
|
||||
// coprocessor should be cleaned up.
|
||||
execShutdown(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation(user) {
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
execShutdown(new RegionServerObserverOperation(user) {
|
||||
@Override
|
||||
public void call(RegionServerObserver observer) throws IOException {
|
||||
observer.preStopRegionServer(this);
|
||||
|
@ -169,9 +172,12 @@ public class RegionServerCoprocessorHost extends
|
|||
|
||||
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
|
||||
throws IOException {
|
||||
return execOperationWithResult(endpoint, coprocEnvironments.isEmpty() ? null :
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return endpoint;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
|
||||
rsObserverGetter) {
|
||||
rsObserverGetter, endpoint) {
|
||||
@Override
|
||||
public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
|
||||
return observer.postCreateReplicationEndPoint(this, getResult());
|
||||
|
|
|
@ -192,57 +192,54 @@ public class SecureBulkLoadManager {
|
|||
throw new DoNotRetryIOException("User token cannot be null");
|
||||
}
|
||||
|
||||
boolean bypass = false;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||
region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||
}
|
||||
boolean loaded = false;
|
||||
Map<byte[], List<Path>> map = null;
|
||||
|
||||
try {
|
||||
if (!bypass) {
|
||||
// Get the target fs (HBase region server fs) delegation token
|
||||
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
|
||||
// the 'request user' necessary token to operate on the target fs.
|
||||
// After this point the 'doAs' user will hold two tokens, one for the source fs
|
||||
// ('request user'), another for the target fs (HBase region server principal).
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer");
|
||||
targetfsDelegationToken.acquireDelegationToken(fs);
|
||||
// Get the target fs (HBase region server fs) delegation token
|
||||
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
|
||||
// the 'request user' necessary token to operate on the target fs.
|
||||
// After this point the 'doAs' user will hold two tokens, one for the source fs
|
||||
// ('request user'), another for the target fs (HBase region server principal).
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer");
|
||||
targetfsDelegationToken.acquireDelegationToken(fs);
|
||||
|
||||
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
|
||||
if (targetFsToken != null
|
||||
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){
|
||||
ugi.addToken(targetFsToken);
|
||||
}
|
||||
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
|
||||
if (targetFsToken != null
|
||||
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){
|
||||
ugi.addToken(targetFsToken);
|
||||
}
|
||||
}
|
||||
|
||||
map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
|
||||
@Override
|
||||
public Map<byte[], List<Path>> run() {
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
fs = FileSystem.get(conf);
|
||||
for(Pair<byte[], String> el: familyPaths) {
|
||||
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
||||
if(!fs.exists(stageFamily)) {
|
||||
fs.mkdirs(stageFamily);
|
||||
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
|
||||
}
|
||||
map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
|
||||
@Override
|
||||
public Map<byte[], List<Path>> run() {
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
fs = FileSystem.get(conf);
|
||||
for(Pair<byte[], String> el: familyPaths) {
|
||||
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
||||
if(!fs.exists(stageFamily)) {
|
||||
fs.mkdirs(stageFamily);
|
||||
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
|
||||
}
|
||||
//We call bulkLoadHFiles as requesting user
|
||||
//To enable access prior to staging
|
||||
return region.bulkLoadHFiles(familyPaths, true,
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to complete bulk load", e);
|
||||
}
|
||||
return null;
|
||||
//We call bulkLoadHFiles as requesting user
|
||||
//To enable access prior to staging
|
||||
return region.bulkLoadHFiles(familyPaths, true,
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to complete bulk load", e);
|
||||
}
|
||||
});
|
||||
if (map != null) {
|
||||
loaded = true;
|
||||
return null;
|
||||
}
|
||||
});
|
||||
if (map != null) {
|
||||
loaded = true;
|
||||
}
|
||||
} finally {
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
|
|
|
@ -909,12 +909,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
|
||||
// Coprocessor hook.
|
||||
if (!coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit())) {
|
||||
if (entry.getEdit().isReplay()) {
|
||||
// Set replication scope null so that this won't be replicated
|
||||
entry.getKey().serializeReplicationScope(false);
|
||||
}
|
||||
}
|
||||
coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
|
||||
if (!listeners.isEmpty()) {
|
||||
for (WALActionsListener i : listeners) {
|
||||
i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
|
||||
|
|
|
@ -139,31 +139,29 @@ public class WALCoprocessorHost
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param info
|
||||
* @param logKey
|
||||
* @param logEdit
|
||||
* @return true if default behavior should be bypassed, false otherwise
|
||||
* @throws IOException
|
||||
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
|
||||
* with something that doesn't expose IntefaceAudience.Private classes.
|
||||
*/
|
||||
public boolean preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
|
||||
@Deprecated
|
||||
public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
|
||||
throws IOException {
|
||||
return execOperationWithResult(false, coprocEnvironments.isEmpty() ? null :
|
||||
new ObserverOperationWithResult<WALObserver, Boolean>(walObserverGetter) {
|
||||
// Not bypassable.
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
execOperation(new WALObserverOperation() {
|
||||
@Override
|
||||
public Boolean call(WALObserver oserver) throws IOException {
|
||||
return oserver.preWALWrite(this, info, logKey, logEdit);
|
||||
public void call(WALObserver oserver) throws IOException {
|
||||
oserver.preWALWrite(this, info, logKey, logEdit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param info
|
||||
* @param logKey
|
||||
* @param logEdit
|
||||
* @throws IOException
|
||||
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
|
||||
* with something that doesn't expose IntefaceAudience.Private classes.
|
||||
*/
|
||||
@Deprecated
|
||||
public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
|
||||
|
|
|
@ -1264,10 +1264,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1282,10 +1281,9 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
boolean newValue) throws IOException {
|
||||
requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN);
|
||||
return newValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -294,12 +294,6 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
|
|
|
@ -77,6 +77,5 @@ public class TestHRegionLocation {
|
|||
int compare2 = hsl2.compareTo(hsl1);
|
||||
assertTrue((compare1 > 0)? compare2 < 0: compare2 > 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -99,12 +99,11 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
|
||||
public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
|
||||
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
|
||||
boolean bypass = false;
|
||||
// check table name matches or not.
|
||||
if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
|
||||
return bypass;
|
||||
return;
|
||||
}
|
||||
preWALWriteCalled = true;
|
||||
// here we're going to remove one keyvalue from the WALEdit, and add
|
||||
|
@ -134,7 +133,6 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
|
|||
LOG.debug("About to delete a KeyValue from WALEdit.");
|
||||
cells.remove(deletedCell);
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -90,7 +90,6 @@ public class TestMasterObserver {
|
|||
|
||||
public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
|
||||
|
||||
private boolean bypass = false;
|
||||
private boolean preCreateTableCalled;
|
||||
private boolean postCreateTableCalled;
|
||||
private boolean preDeleteTableCalled;
|
||||
|
@ -182,10 +181,6 @@ public class TestMasterObserver {
|
|||
private boolean preLockHeartbeatCalled;
|
||||
private boolean postLockHeartbeatCalled;
|
||||
|
||||
public void enableBypass(boolean bypass) {
|
||||
this.bypass = bypass;
|
||||
}
|
||||
|
||||
public void resetStates() {
|
||||
preCreateTableCalled = false;
|
||||
postCreateTableCalled = false;
|
||||
|
@ -301,9 +296,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableDescriptor desc, RegionInfo[] regions) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preCreateTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -324,9 +316,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableName tableName) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preDeleteTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -347,9 +336,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableName tableName) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preTruncateTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -367,12 +353,6 @@ public class TestMasterObserver {
|
|||
return preTruncateTableCalled && !postTruncateTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean newValue, final MasterSwitchType switchType) throws IOException {
|
||||
|
@ -381,9 +361,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableName tableName, TableDescriptor htd) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preModifyTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -404,9 +381,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
NamespaceDescriptor ns) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preCreateNamespaceCalled = true;
|
||||
}
|
||||
|
||||
|
@ -427,9 +401,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
String name) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preDeleteNamespaceCalled = true;
|
||||
}
|
||||
|
||||
|
@ -450,9 +421,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
NamespaceDescriptor ns) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preModifyNamespaceCalled = true;
|
||||
}
|
||||
|
||||
|
@ -490,9 +458,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
List<NamespaceDescriptor> descriptors) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preListNamespaceDescriptorsCalled = true;
|
||||
}
|
||||
|
||||
|
@ -513,9 +478,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableName tableName) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preEnableTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -536,9 +498,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
TableName tableName) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preDisableTableCalled = true;
|
||||
}
|
||||
|
||||
|
@ -619,9 +578,6 @@ public class TestMasterObserver {
|
|||
public void preMove(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
RegionInfo region, ServerName srcServer, ServerName destServer)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preMoveCalled = true;
|
||||
}
|
||||
|
||||
|
@ -643,9 +599,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preAssign(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
final RegionInfo regionInfo) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preAssignCalled = true;
|
||||
}
|
||||
|
||||
|
@ -666,9 +619,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
final RegionInfo regionInfo, final boolean force) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preUnassignCalled = true;
|
||||
}
|
||||
|
||||
|
@ -709,9 +659,6 @@ public class TestMasterObserver {
|
|||
@Override
|
||||
public void preBalance(ObserverContext<MasterCoprocessorEnvironment> env)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preBalanceCalled = true;
|
||||
}
|
||||
|
||||
|
@ -730,13 +677,9 @@ public class TestMasterObserver {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> env, boolean b)
|
||||
public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> env, boolean b)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preBalanceSwitchCalled = true;
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -898,9 +841,6 @@ public class TestMasterObserver {
|
|||
final ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
final TableDescriptor desc,
|
||||
final RegionInfo[] regions) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preCreateTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -928,9 +868,6 @@ public class TestMasterObserver {
|
|||
public void preDeleteTableAction(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preDeleteTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -954,9 +891,6 @@ public class TestMasterObserver {
|
|||
public void preTruncateTableAction(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> env, final TableName tableName)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preTruncateTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -980,9 +914,6 @@ public class TestMasterObserver {
|
|||
final ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
final TableName tableName,
|
||||
final TableDescriptor htd) throws IOException {
|
||||
if (bypass) {
|
||||
env.bypass();
|
||||
}
|
||||
preModifyTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -1005,9 +936,6 @@ public class TestMasterObserver {
|
|||
public void preEnableTableAction(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
ctx.bypass();
|
||||
}
|
||||
preEnableTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -1030,9 +958,6 @@ public class TestMasterObserver {
|
|||
public void preDisableTableAction(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx, final TableName tableName)
|
||||
throws IOException {
|
||||
if (bypass) {
|
||||
ctx.bypass();
|
||||
}
|
||||
preDisableTableActionCalled = true;
|
||||
}
|
||||
|
||||
|
@ -1357,7 +1282,6 @@ public class TestMasterObserver {
|
|||
HMaster master = cluster.getMaster();
|
||||
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
|
||||
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
|
||||
cp.enableBypass(true);
|
||||
cp.resetStates();
|
||||
assertFalse("No table created yet", cp.wasCreateTableCalled());
|
||||
|
||||
|
@ -1370,7 +1294,6 @@ public class TestMasterObserver {
|
|||
admin.createTable(htd, Arrays.copyOfRange(HBaseTestingUtility.KEYS,
|
||||
1, HBaseTestingUtility.KEYS.length));
|
||||
|
||||
// preCreateTable can't bypass default action.
|
||||
assertTrue("Test table should be created", cp.wasCreateTableCalled());
|
||||
tableCreationLatch.await();
|
||||
assertTrue("Table pre create handler called.", cp
|
||||
|
@ -1389,7 +1312,6 @@ public class TestMasterObserver {
|
|||
tableCreationLatch = new CountDownLatch(1);
|
||||
admin.disableTable(tableName);
|
||||
assertTrue(admin.isTableDisabled(tableName));
|
||||
// preDisableTable can't bypass default action.
|
||||
assertTrue("Coprocessor should have been called on table disable",
|
||||
cp.wasDisableTableCalled());
|
||||
assertTrue("Disable table handler should be called.",
|
||||
|
@ -1399,7 +1321,6 @@ public class TestMasterObserver {
|
|||
assertFalse(cp.wasEnableTableCalled());
|
||||
admin.enableTable(tableName);
|
||||
assertTrue(admin.isTableEnabled(tableName));
|
||||
// preEnableTable can't bypass default action.
|
||||
assertTrue("Coprocessor should have been called on table enable",
|
||||
cp.wasEnableTableCalled());
|
||||
assertTrue("Enable table handler should be called.",
|
||||
|
@ -1411,7 +1332,6 @@ public class TestMasterObserver {
|
|||
// modify table
|
||||
htd.setMaxFileSize(512 * 1024 * 1024);
|
||||
modifyTableSync(admin, tableName, htd);
|
||||
// preModifyTable can't bypass default action.
|
||||
assertTrue("Test table should have been modified",
|
||||
cp.wasModifyTableCalled());
|
||||
|
||||
|
@ -1424,14 +1344,12 @@ public class TestMasterObserver {
|
|||
deleteTable(admin, tableName);
|
||||
assertFalse("Test table should have been deleted",
|
||||
admin.tableExists(tableName));
|
||||
// preDeleteTable can't bypass default action.
|
||||
assertTrue("Coprocessor should have been called on table delete",
|
||||
cp.wasDeleteTableCalled());
|
||||
assertTrue("Delete table handler should be called.",
|
||||
cp.wasDeleteTableActionCalled());
|
||||
|
||||
// turn off bypass, run the tests again
|
||||
cp.enableBypass(false);
|
||||
// When bypass was supported, we'd turn off bypass and rerun tests. Leaving rerun in place.
|
||||
cp.resetStates();
|
||||
|
||||
admin.createTable(htd);
|
||||
|
@ -1555,10 +1473,6 @@ public class TestMasterObserver {
|
|||
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
|
||||
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
|
||||
|
||||
cp.enableBypass(false);
|
||||
cp.resetStates();
|
||||
|
||||
|
||||
// create a table
|
||||
Admin admin = UTIL.getAdmin();
|
||||
admin.createNamespace(NamespaceDescriptor.create(testNamespace).build());
|
||||
|
@ -1567,75 +1481,8 @@ public class TestMasterObserver {
|
|||
assertNotNull(admin.getNamespaceDescriptor(testNamespace));
|
||||
assertTrue("Test namespace descriptor should have been called",
|
||||
cp.wasGetNamespaceDescriptorCalled());
|
||||
|
||||
// turn off bypass, run the tests again
|
||||
cp.enableBypass(true);
|
||||
cp.resetStates();
|
||||
|
||||
boolean expected = false;
|
||||
try {
|
||||
admin.modifyNamespace(NamespaceDescriptor.create(testNamespace).build());
|
||||
} catch (BypassCoprocessorException ce) {
|
||||
expected = true;
|
||||
}
|
||||
assertTrue(expected);
|
||||
assertTrue("Test namespace should not have been modified",
|
||||
cp.preModifyNamespaceCalledOnly());
|
||||
|
||||
assertNotNull(admin.getNamespaceDescriptor(testNamespace));
|
||||
assertTrue("Test namespace descriptor should have been called",
|
||||
cp.wasGetNamespaceDescriptorCalled());
|
||||
|
||||
expected = false;
|
||||
try {
|
||||
admin.deleteNamespace(testNamespace);
|
||||
} catch (BypassCoprocessorException ce) {
|
||||
expected = true;
|
||||
}
|
||||
assertTrue(expected);
|
||||
assertTrue("Test namespace should not have been deleted", cp.preDeleteNamespaceCalledOnly());
|
||||
|
||||
assertNotNull(admin.getNamespaceDescriptor(testNamespace));
|
||||
assertTrue("Test namespace descriptor should have been called",
|
||||
cp.wasGetNamespaceDescriptorCalled());
|
||||
|
||||
cp.enableBypass(false);
|
||||
cp.resetStates();
|
||||
|
||||
// delete table
|
||||
admin.modifyNamespace(NamespaceDescriptor.create(testNamespace).build());
|
||||
assertTrue("Test namespace should have been modified", cp.wasModifyNamespaceCalled());
|
||||
|
||||
admin.deleteNamespace(testNamespace);
|
||||
assertTrue("Test namespace should have been deleted", cp.wasDeleteNamespaceCalled());
|
||||
|
||||
cp.enableBypass(true);
|
||||
cp.resetStates();
|
||||
|
||||
expected = false;
|
||||
try {
|
||||
admin.createNamespace(NamespaceDescriptor.create(testNamespace).build());
|
||||
} catch (BypassCoprocessorException ce) {
|
||||
expected = true;
|
||||
}
|
||||
assertTrue(expected);
|
||||
assertTrue("Test namespace should not be created", cp.preCreateNamespaceCalledOnly());
|
||||
|
||||
// turn on bypass, run the test
|
||||
cp.enableBypass(true);
|
||||
cp.resetStates();
|
||||
|
||||
admin.listNamespaceDescriptors();
|
||||
assertTrue("post listNamespace should not have been called",
|
||||
cp.preListNamespaceDescriptorsCalledOnly());
|
||||
|
||||
// turn off bypass, run the tests again
|
||||
cp.enableBypass(false);
|
||||
cp.resetStates();
|
||||
|
||||
admin.listNamespaceDescriptors();
|
||||
assertTrue("post listNamespace should have been called",
|
||||
cp.wasListNamespaceDescriptorsCalled());
|
||||
// This test used to do a bunch w/ bypass but bypass of these table and namespace stuff has
|
||||
// been removed so the testing code was removed.
|
||||
}
|
||||
|
||||
private void modifyTableSync(Admin admin, TableName tableName, HTableDescriptor htd)
|
||||
|
@ -1659,7 +1506,6 @@ public class TestMasterObserver {
|
|||
HMaster master = cluster.getMaster();
|
||||
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
|
||||
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
|
||||
cp.enableBypass(false);
|
||||
cp.resetStates();
|
||||
|
||||
Table table = UTIL.createMultiRegionTable(tableName, TEST_FAMILY);
|
||||
|
|
|
@ -714,6 +714,9 @@ public class TestMobCompactor {
|
|||
while (fileList.length != num) {
|
||||
Thread.sleep(50);
|
||||
fileList = fs.listStatus(path);
|
||||
for (FileStatus fileStatus: fileList) {
|
||||
LOG.info(fileStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -738,6 +741,7 @@ public class TestMobCompactor {
|
|||
candidates.remove(0);
|
||||
}
|
||||
c.bypass();
|
||||
c.complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -359,7 +359,7 @@ public class TestHRegion {
|
|||
|
||||
/**
|
||||
* Create a WAL outside of the usual helper in
|
||||
* {@link HBaseTestingUtility#createWal(Configuration, Path, HRegionInfo)} because that method
|
||||
* {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
|
||||
* doesn't play nicely with FaultyFileSystem. Call this method before overriding
|
||||
* {@code fs.file.impl}.
|
||||
* @param callingMethod a unique component for the path, probably the name of the test method.
|
||||
|
@ -2386,6 +2386,9 @@ public class TestHRegion {
|
|||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
|
||||
FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
|
||||
// This chunk creation is done throughout the code base. Do we want to move it into core?
|
||||
// It is missing from this test. W/o it we NPE.
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
|
||||
|
@ -2433,17 +2436,17 @@ public class TestHRegion {
|
|||
// save normalCPHost and replaced by mockedCPHost
|
||||
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
|
||||
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
||||
Answer<Boolean> answer = new Answer<Boolean>() {
|
||||
// Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
|
||||
// do below format (from Mockito doc).
|
||||
Mockito.doAnswer(new Answer() {
|
||||
@Override
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0,
|
||||
MiniBatchOperationInProgress.class);
|
||||
mb.addOperationsFromCP(0, new Mutation[]{addPut});
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
};
|
||||
when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)))
|
||||
.then(answer);
|
||||
}).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
|
||||
region.setCoprocessorHost(mockedCPHost);
|
||||
region.put(originalPut);
|
||||
region.setCoprocessorHost(normalCPHost);
|
||||
|
|
Loading…
Reference in New Issue