HBASE-16989 RowProcess#postBatchMutate doesn’t be executed before the mvcc transaction completion. (ChiaPing Tsai)
This commit is contained in:
parent
fa7ed58e1b
commit
a3a56b6380
|
@ -682,7 +682,9 @@ public interface RegionObserver extends Coprocessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
|
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
|
||||||
* memstore and WAL.
|
* memstore and WAL. The difference of this one with {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
|
||||||
|
* and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is
|
||||||
|
* this hook will be executed before the mvcc transaction completion.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
|
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
|
||||||
* If need a Cell reference for later use, copy the cell and use that.
|
* If need a Cell reference for later use, copy the cell and use that.
|
||||||
|
|
|
@ -3330,6 +3330,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
|
applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// calling the post CP hook for batch mutation
|
||||||
|
if (!replay && coprocessorHost != null) {
|
||||||
|
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
||||||
|
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
|
||||||
|
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
|
||||||
|
coprocessorHost.postBatchMutate(miniBatchOp);
|
||||||
|
}
|
||||||
|
|
||||||
// STEP 6. Complete mvcc.
|
// STEP 6. Complete mvcc.
|
||||||
if (replay) {
|
if (replay) {
|
||||||
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
|
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
|
||||||
|
@ -3346,14 +3354,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
releaseRowLocks(acquiredRowLocks);
|
releaseRowLocks(acquiredRowLocks);
|
||||||
|
|
||||||
// calling the post CP hook for batch mutation
|
|
||||||
if (!replay && coprocessorHost != null) {
|
|
||||||
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
|
||||||
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
|
|
||||||
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
|
|
||||||
coprocessorHost.postBatchMutate(miniBatchOp);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = firstIndex; i < lastIndexExclusive; i ++) {
|
for (int i = firstIndex; i < lastIndexExclusive; i ++) {
|
||||||
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
|
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
|
||||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||||
|
@ -7098,21 +7098,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
applyToMemstore(getHStore(cell), cell, memstoreSize);
|
applyToMemstore(getHStore(cell), cell, memstoreSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// STEP 8. Complete mvcc.
|
|
||||||
|
// STEP 8. call postBatchMutate hook
|
||||||
|
processor.postBatchMutate(this);
|
||||||
|
|
||||||
|
// STEP 9. Complete mvcc.
|
||||||
mvcc.completeAndWait(writeEntry);
|
mvcc.completeAndWait(writeEntry);
|
||||||
writeEntry = null;
|
writeEntry = null;
|
||||||
|
|
||||||
// STEP 9. Release region lock
|
// STEP 10. Release region lock
|
||||||
if (locked) {
|
if (locked) {
|
||||||
this.updatesLock.readLock().unlock();
|
this.updatesLock.readLock().unlock();
|
||||||
locked = false;
|
locked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// STEP 10. Release row lock(s)
|
// STEP 11. Release row lock(s)
|
||||||
releaseRowLocks(acquiredRowLocks);
|
releaseRowLocks(acquiredRowLocks);
|
||||||
|
|
||||||
// STEP 11. call postBatchMutate hook
|
|
||||||
processor.postBatchMutate(this);
|
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -19,19 +19,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import java.io.IOException;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -44,6 +53,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -479,4 +492,101 @@ public class TestFromClientSide3 {
|
||||||
ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
|
ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
|
||||||
assertTrue(con.hasCellBlockSupport());
|
assertTrue(con.hasCellBlockSupport());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPutWithPreBatchMutate ()throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testPutWithPreBatchMutate");
|
||||||
|
testPreBatchMutate(tableName, () -> {
|
||||||
|
try {
|
||||||
|
Table t = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||||
|
t.put(put);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRowMutationsWithPreBatchMutate ()throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate");
|
||||||
|
testPreBatchMutate(tableName, () -> {
|
||||||
|
try {
|
||||||
|
RowMutations rm = new RowMutations(ROW, 1);
|
||||||
|
Table t = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||||
|
rm.add(put);
|
||||||
|
t.mutateRow(rm);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addCoprocessor(WatiingForScanObserver.class.getName());
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
TEST_UTIL.getAdmin().createTable(desc);
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(2);
|
||||||
|
service.execute(rn);
|
||||||
|
final List<Cell> cells = new ArrayList<>();
|
||||||
|
service.execute(() -> {
|
||||||
|
try {
|
||||||
|
// waiting for update.
|
||||||
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
Table t = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
try (ResultScanner scanner = t.getScanner(scan)) {
|
||||||
|
for (Result r : scanner) {
|
||||||
|
cells.addAll(Arrays.asList(r.rawCells()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException | InterruptedException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
service.shutdown();
|
||||||
|
service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||||
|
assertEquals("The write is blocking by RegionObserver#postBatchMutate"
|
||||||
|
+ ", so the data is invisible to reader", 0, cells.size());
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends RegionObserver> T find(final TableName tableName,
|
||||||
|
Class<T> clz) throws IOException, InterruptedException {
|
||||||
|
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||||
|
List<Region> regions = rs.getOnlineRegions(tableName);
|
||||||
|
assertEquals(1, regions.size());
|
||||||
|
Region region = regions.get(0);
|
||||||
|
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
|
||||||
|
assertTrue("The cp instance should be " + clz.getName()
|
||||||
|
+ ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
|
||||||
|
return clz.cast(cp);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class WatiingForScanObserver extends BaseRegionObserver {
|
||||||
|
|
||||||
|
private final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
|
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||||
|
try {
|
||||||
|
// waiting for scanner
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw new IOException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
final Scan scan, final RegionScanner s) throws IOException {
|
||||||
|
latch.countDown();
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue