HBASE-19384 Results returned by preAppend hook in a coprocessor are replaced with
null from other coprocessor even on bypass If 'bypass' is set by a Coprocessor, skip out on calling any subsequent Coprocessors that might be chained to a bypassable method. This patch restores some of the 'complete' behavior removed by HBASE-19123 only 'bypass' now triggers 'complete'.
This commit is contained in:
parent
e29685ed6d
commit
1856237e2d
|
@ -678,6 +678,11 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
// Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
|
||||
bypass |= observerOperation.shouldBypass();
|
||||
observerOperation.postEnvCall();
|
||||
if (bypass) {
|
||||
// If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
|
||||
// In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
|
||||
break;
|
||||
}
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
|
|
@ -62,9 +62,14 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
|
|||
* that the replacement for the bypassed code takes care of all necessary
|
||||
* skipped concerns. Because those concerns can change at any point, such an
|
||||
* assumption is never safe.</p>
|
||||
* <p>As of hbase2, when bypass has been set, we will NOT call any Coprocessors follow the
|
||||
* bypassing Coprocessor; we cut short the processing and return the bypassing Coprocessors
|
||||
* response (this used be a separate 'complete' option that has been folded into the
|
||||
* 'bypass' in hbase2.</p>
|
||||
*/
|
||||
void bypass();
|
||||
|
||||
|
||||
/**
|
||||
* Returns the active user for the coprocessor call. If an explicit {@code User} instance was
|
||||
* provided to the constructor, that will be returned, otherwise if we are in the context of an
|
||||
|
|
|
@ -220,6 +220,7 @@ public interface RegionObserver {
|
|||
* of candidates. If you remove all the candidates then the compaction will be canceled.
|
||||
* <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
|
||||
* the passed in <code>candidates</code>.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* @param c the environment provided by the region server
|
||||
* @param store the store where compaction is being requested
|
||||
* @param candidates the store files currently available for compaction
|
||||
|
@ -309,7 +310,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client performs a Get
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* @param c the environment provided by the region server
|
||||
* @param get the Get request
|
||||
* @param result The result to return to the client if default processing
|
||||
|
@ -334,7 +336,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client tests for existence using a Get.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* @param c the environment provided by the region server
|
||||
* @param get the Get request
|
||||
* @param exists the result returned by the region server
|
||||
|
@ -360,7 +363,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client stores a value.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -388,7 +392,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client deletes a value.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -403,7 +408,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the server updates the timestamp for version delete with latest timestamp.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* @param c the environment provided by the region server
|
||||
* @param mutation - the parent mutation associated with this delete cell
|
||||
* @param cell - The deleteColumn with latest version cell
|
||||
|
@ -495,7 +501,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before checkAndPut.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -523,7 +530,8 @@ public interface RegionObserver {
|
|||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -568,7 +576,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before checkAndDelete.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -595,7 +604,8 @@ public interface RegionObserver {
|
|||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -639,7 +649,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before Append.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -659,7 +670,8 @@ public interface RegionObserver {
|
|||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -690,7 +702,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before Increment.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -710,7 +723,8 @@ public interface RegionObserver {
|
|||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -772,7 +786,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client asks for the next row on a scanner.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells returned by scanner, beyond the life of this
|
||||
* invocation. If need a Cell reference for later use, copy the cell and use that.
|
||||
|
@ -836,7 +851,8 @@ public interface RegionObserver {
|
|||
/**
|
||||
* Called before the client closes a scanner.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* @param c the environment provided by the region server
|
||||
* @param s the scanner
|
||||
*/
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -63,9 +64,12 @@ public class TestRegionObserverBypass {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// Stack up three coprocessors just so I can check bypass skips subsequent calls.
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
TestCoprocessor.class.getName());
|
||||
new String [] {TestCoprocessor.class.getName(),
|
||||
TestCoprocessor2.class.getName(),
|
||||
TestCoprocessor3.class.getName()});
|
||||
util = new HBaseTestingUtility(conf);
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
@ -85,6 +89,8 @@ public class TestRegionObserverBypass {
|
|||
admin.deleteTable(tableName);
|
||||
}
|
||||
util.createTable(tableName, new byte[][] {dummy, test});
|
||||
TestCoprocessor.PREPUT_BYPASSES.set(0);
|
||||
TestCoprocessor.PREPUT_INVOCATIONS.set(0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -104,6 +110,7 @@ public class TestRegionObserverBypass {
|
|||
|
||||
/**
|
||||
* Test various multiput operations.
|
||||
* If the column family is 'test', then bypass is invoked.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
@ -205,7 +212,48 @@ public class TestRegionObserverBypass {
|
|||
t.delete(d);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when bypass is called, we skip out calling any other coprocessors stacked up method,
|
||||
* in this case, a prePut.
|
||||
* If the column family is 'test', then bypass is invoked.
|
||||
*/
|
||||
@Test
|
||||
public void testBypassAlsoCompletes() throws IOException {
|
||||
//ensure that server time increments every time we do an operation, otherwise
|
||||
//previous deletes will eclipse successive puts having the same timestamp
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||
|
||||
Table t = util.getConnection().getTable(tableName);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
Put p = new Put(row1);
|
||||
p.addColumn(dummy, dummy, dummy);
|
||||
puts.add(p);
|
||||
p = new Put(row2);
|
||||
p.addColumn(test, dummy, dummy);
|
||||
puts.add(p);
|
||||
p = new Put(row3);
|
||||
p.addColumn(test, dummy, dummy);
|
||||
puts.add(p);
|
||||
t.put(puts);
|
||||
// Ensure expected result.
|
||||
checkRowAndDelete(t,row1,1);
|
||||
checkRowAndDelete(t,row2,0);
|
||||
checkRowAndDelete(t,row3,0);
|
||||
// We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three
|
||||
// puts above two of which bypassed. A bypass means do not call the other coprocessors in the
|
||||
// stack so for the two 'test' calls in the above, we should not have call through to all all
|
||||
// three coprocessors in the chain. So we should have:
|
||||
// 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation +
|
||||
// 1 bypass for the last put. Assert.
|
||||
assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get());
|
||||
assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get());
|
||||
}
|
||||
|
||||
|
||||
public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||
static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0);
|
||||
static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
|
@ -215,10 +263,22 @@ public class TestRegionObserverBypass {
|
|||
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Put put, final WALEdit edit, final Durability durability)
|
||||
throws IOException {
|
||||
PREPUT_INVOCATIONS.incrementAndGet();
|
||||
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
|
||||
if (familyMap.containsKey(test)) {
|
||||
PREPUT_BYPASSES.incrementAndGet();
|
||||
e.bypass();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls through to TestCoprocessor.
|
||||
*/
|
||||
public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {}
|
||||
|
||||
/**
|
||||
* Calls through to TestCoprocessor.
|
||||
*/
|
||||
public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue