HBASE-19384 Results returned by preAppend hook in a coprocessor are replaced with
null from other coprocessor even on bypass If 'bypass' is set by a Coprocessor, skip out on calling any subsequent Coprocessors that might be chained to a bypassable method. This patch restores some of the 'complete' behavior removed by HBASE-19123 only 'bypass' now triggers 'complete'.
This commit is contained in:
parent
2ab0470b43
commit
db49391683
|
@ -678,6 +678,11 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
||||||
// Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
|
// Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
|
||||||
bypass |= observerOperation.shouldBypass();
|
bypass |= observerOperation.shouldBypass();
|
||||||
observerOperation.postEnvCall();
|
observerOperation.postEnvCall();
|
||||||
|
if (bypass) {
|
||||||
|
// If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
|
||||||
|
// In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return bypass;
|
return bypass;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,9 +62,14 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
|
||||||
* that the replacement for the bypassed code takes care of all necessary
|
* that the replacement for the bypassed code takes care of all necessary
|
||||||
* skipped concerns. Because those concerns can change at any point, such an
|
* skipped concerns. Because those concerns can change at any point, such an
|
||||||
* assumption is never safe.</p>
|
* assumption is never safe.</p>
|
||||||
|
* <p>As of hbase2, when bypass has been set, we will NOT call any Coprocessors follow the
|
||||||
|
* bypassing Coprocessor; we cut short the processing and return the bypassing Coprocessors
|
||||||
|
* response (this used be a separate 'complete' option that has been folded into the
|
||||||
|
* 'bypass' in hbase2.</p>
|
||||||
*/
|
*/
|
||||||
void bypass();
|
void bypass();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the active user for the coprocessor call. If an explicit {@code User} instance was
|
* Returns the active user for the coprocessor call. If an explicit {@code User} instance was
|
||||||
* provided to the constructor, that will be returned, otherwise if we are in the context of an
|
* provided to the constructor, that will be returned, otherwise if we are in the context of an
|
||||||
|
|
|
@ -220,6 +220,7 @@ public interface RegionObserver {
|
||||||
* of candidates. If you remove all the candidates then the compaction will be canceled.
|
* of candidates. If you remove all the candidates then the compaction will be canceled.
|
||||||
* <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
|
* <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
|
||||||
* the passed in <code>candidates</code>.
|
* the passed in <code>candidates</code>.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param store the store where compaction is being requested
|
* @param store the store where compaction is being requested
|
||||||
* @param candidates the store files currently available for compaction
|
* @param candidates the store files currently available for compaction
|
||||||
|
@ -309,7 +310,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client performs a Get
|
* Called before the client performs a Get
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param get the Get request
|
* @param get the Get request
|
||||||
* @param result The result to return to the client if default processing
|
* @param result The result to return to the client if default processing
|
||||||
|
@ -334,7 +336,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client tests for existence using a Get.
|
* Called before the client tests for existence using a Get.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param get the Get request
|
* @param get the Get request
|
||||||
* @param exists the result returned by the region server
|
* @param exists the result returned by the region server
|
||||||
|
@ -360,7 +363,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client stores a value.
|
* Called before the client stores a value.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -388,7 +392,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client deletes a value.
|
* Called before the client deletes a value.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -403,7 +408,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the server updates the timestamp for version delete with latest timestamp.
|
* Called before the server updates the timestamp for version delete with latest timestamp.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param mutation - the parent mutation associated with this delete cell
|
* @param mutation - the parent mutation associated with this delete cell
|
||||||
* @param cell - The deleteColumn with latest version cell
|
* @param cell - The deleteColumn with latest version cell
|
||||||
|
@ -495,7 +501,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before checkAndPut.
|
* Called before checkAndPut.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -523,7 +530,8 @@ public interface RegionObserver {
|
||||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||||
* can lead to potential deadlock.
|
* can lead to potential deadlock.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -568,7 +576,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before checkAndDelete.
|
* Called before checkAndDelete.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -595,7 +604,8 @@ public interface RegionObserver {
|
||||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||||
* can lead to potential deadlock.
|
* can lead to potential deadlock.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -639,7 +649,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before Append.
|
* Called before Append.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -659,7 +670,8 @@ public interface RegionObserver {
|
||||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||||
* can lead to potential deadlock.
|
* can lead to potential deadlock.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -690,7 +702,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before Increment.
|
* Called before Increment.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -710,7 +723,8 @@ public interface RegionObserver {
|
||||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||||
* can lead to potential deadlock.
|
* can lead to potential deadlock.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -772,7 +786,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client asks for the next row on a scanner.
|
* Called before the client asks for the next row on a scanner.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells returned by scanner, beyond the life of this
|
* Note: Do not retain references to any Cells returned by scanner, beyond the life of this
|
||||||
* invocation. If need a Cell reference for later use, copy the cell and use that.
|
* invocation. If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
@ -836,7 +851,8 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called before the client closes a scanner.
|
* Called before the client closes a scanner.
|
||||||
* <p>
|
* <p>
|
||||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||||
|
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param s the scanner
|
* @param s the scanner
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -63,9 +64,12 @@ public class TestRegionObserverBypass {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// Stack up three coprocessors just so I can check bypass skips subsequent calls.
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||||
TestCoprocessor.class.getName());
|
new String [] {TestCoprocessor.class.getName(),
|
||||||
|
TestCoprocessor2.class.getName(),
|
||||||
|
TestCoprocessor3.class.getName()});
|
||||||
util = new HBaseTestingUtility(conf);
|
util = new HBaseTestingUtility(conf);
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
}
|
}
|
||||||
|
@ -85,6 +89,8 @@ public class TestRegionObserverBypass {
|
||||||
admin.deleteTable(tableName);
|
admin.deleteTable(tableName);
|
||||||
}
|
}
|
||||||
util.createTable(tableName, new byte[][] {dummy, test});
|
util.createTable(tableName, new byte[][] {dummy, test});
|
||||||
|
TestCoprocessor.PREPUT_BYPASSES.set(0);
|
||||||
|
TestCoprocessor.PREPUT_INVOCATIONS.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,6 +110,7 @@ public class TestRegionObserverBypass {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test various multiput operations.
|
* Test various multiput operations.
|
||||||
|
* If the column family is 'test', then bypass is invoked.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -205,7 +212,48 @@ public class TestRegionObserverBypass {
|
||||||
t.delete(d);
|
t.delete(d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that when bypass is called, we skip out calling any other coprocessors stacked up method,
|
||||||
|
* in this case, a prePut.
|
||||||
|
* If the column family is 'test', then bypass is invoked.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBypassAlsoCompletes() throws IOException {
|
||||||
|
//ensure that server time increments every time we do an operation, otherwise
|
||||||
|
//previous deletes will eclipse successive puts having the same timestamp
|
||||||
|
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||||
|
|
||||||
|
Table t = util.getConnection().getTable(tableName);
|
||||||
|
List<Put> puts = new ArrayList<>();
|
||||||
|
Put p = new Put(row1);
|
||||||
|
p.addColumn(dummy, dummy, dummy);
|
||||||
|
puts.add(p);
|
||||||
|
p = new Put(row2);
|
||||||
|
p.addColumn(test, dummy, dummy);
|
||||||
|
puts.add(p);
|
||||||
|
p = new Put(row3);
|
||||||
|
p.addColumn(test, dummy, dummy);
|
||||||
|
puts.add(p);
|
||||||
|
t.put(puts);
|
||||||
|
// Ensure expected result.
|
||||||
|
checkRowAndDelete(t,row1,1);
|
||||||
|
checkRowAndDelete(t,row2,0);
|
||||||
|
checkRowAndDelete(t,row3,0);
|
||||||
|
// We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three
|
||||||
|
// puts above two of which bypassed. A bypass means do not call the other coprocessors in the
|
||||||
|
// stack so for the two 'test' calls in the above, we should not have call through to all all
|
||||||
|
// three coprocessors in the chain. So we should have:
|
||||||
|
// 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation +
|
||||||
|
// 1 bypass for the last put. Assert.
|
||||||
|
assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get());
|
||||||
|
assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
|
public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||||
|
static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0);
|
||||||
|
static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<RegionObserver> getRegionObserver() {
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
return Optional.of(this);
|
return Optional.of(this);
|
||||||
|
@ -215,10 +263,22 @@ public class TestRegionObserverBypass {
|
||||||
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
|
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
final Put put, final WALEdit edit, final Durability durability)
|
final Put put, final WALEdit edit, final Durability durability)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
PREPUT_INVOCATIONS.incrementAndGet();
|
||||||
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
|
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
|
||||||
if (familyMap.containsKey(test)) {
|
if (familyMap.containsKey(test)) {
|
||||||
|
PREPUT_BYPASSES.incrementAndGet();
|
||||||
e.bypass();
|
e.bypass();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls through to TestCoprocessor.
|
||||||
|
*/
|
||||||
|
public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls through to TestCoprocessor.
|
||||||
|
*/
|
||||||
|
public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue