HBASE-16992 The usage of mutation from CP is weird. (ChiaPing Tsai)
This commit is contained in:
parent
e60a7f6e7d
commit
2182ca34ac
@ -3202,10 +3202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
// Acquire row locks. If not, the whole batch will fail.
|
||||
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
|
||||
|
||||
if (cpMutation.getDurability() == Durability.SKIP_WAL) {
|
||||
recordMutationWithoutWal(cpFamilyMap);
|
||||
}
|
||||
|
||||
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
|
||||
// directly add the cells from those mutations to the familyMaps of this mutation.
|
||||
mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later
|
||||
@ -3227,6 +3223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
if (tmpDur.ordinal() > durability.ordinal()) {
|
||||
durability = tmpDur;
|
||||
}
|
||||
// we use durability of the original mutation for the mutation passed by CP.
|
||||
if (tmpDur == Durability.SKIP_WAL) {
|
||||
recordMutationWithoutWal(m.getFamilyCellMap());
|
||||
continue;
|
||||
@ -3324,6 +3321,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
// We need to update the sequence id for following reasons.
|
||||
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
|
||||
// 2) If no WAL, FSWALEntry won't be used
|
||||
// we use durability of the original mutation for the mutation passed by CP.
|
||||
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
|
||||
if (updateSeqId) {
|
||||
this.updateSequenceId(familyMaps[i].values(),
|
||||
@ -7978,8 +7976,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
int listSize = cells.size();
|
||||
for (int i=0; i < listSize; i++) {
|
||||
Cell cell = cells.get(i);
|
||||
// TODO we need include tags length also here.
|
||||
mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
|
||||
mutationSize += KeyValueUtil.length(cell);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ public class MiniBatchOperationInProgress<T> {
|
||||
* Add more Mutations corresponding to the Mutation at the given index to be committed atomically
|
||||
* in the same batch. These mutations are applied to the WAL and applied to the memstore as well.
|
||||
* The timestamp of the cells in the given Mutations MUST be obtained from the original mutation.
|
||||
*
|
||||
* <b>Note:</b> The durability from CP will be replaced by the durability of corresponding mutation.
|
||||
* @param index the index that corresponds to the original mutation index in the batch
|
||||
* @param newOperations the Mutations to add
|
||||
*/
|
||||
|
@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
||||
@ -2413,6 +2414,77 @@ public class TestHRegion {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataInMemoryWithoutWAL() throws IOException {
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
|
||||
FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
|
||||
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
|
||||
CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
|
||||
|
||||
Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
|
||||
System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
|
||||
final long originalSize = KeyValueUtil.length(originalCell);
|
||||
|
||||
Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
|
||||
System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
|
||||
final long addSize = KeyValueUtil.length(addCell);
|
||||
|
||||
LOG.info("originalSize:" + originalSize
|
||||
+ ", addSize:" + addSize);
|
||||
// start test. We expect that the addPut's durability will be replaced
|
||||
// by originalPut's durability.
|
||||
|
||||
// case 1:
|
||||
testDataInMemoryWithoutWAL(region,
|
||||
new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
|
||||
new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
|
||||
originalSize + addSize);
|
||||
|
||||
// case 2:
|
||||
testDataInMemoryWithoutWAL(region,
|
||||
new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
|
||||
new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
|
||||
originalSize + addSize);
|
||||
|
||||
// case 3:
|
||||
testDataInMemoryWithoutWAL(region,
|
||||
new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
|
||||
new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
|
||||
0);
|
||||
|
||||
// case 4:
|
||||
testDataInMemoryWithoutWAL(region,
|
||||
new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
|
||||
new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
|
||||
0);
|
||||
}
|
||||
|
||||
private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
|
||||
final Put addPut, long delta) throws IOException {
|
||||
final long initSize = region.getDataInMemoryWithoutWAL();
|
||||
// save normalCPHost and replaced by mockedCPHost
|
||||
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
|
||||
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
||||
Answer<Boolean> answer = new Answer<Boolean>() {
|
||||
@Override
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
MiniBatchOperationInProgress<Mutation> mb = invocation.getArgumentAt(0,
|
||||
MiniBatchOperationInProgress.class);
|
||||
mb.addOperationsFromCP(0, new Mutation[]{addPut});
|
||||
return false;
|
||||
}
|
||||
};
|
||||
when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)))
|
||||
.then(answer);
|
||||
region.setCoprocessorHost(mockedCPHost);
|
||||
region.put(originalPut);
|
||||
region.setCoprocessorHost(normalCPHost);
|
||||
final long finalSize = region.getDataInMemoryWithoutWAL();
|
||||
assertEquals("finalSize:" + finalSize + ", initSize:"
|
||||
+ initSize + ", delta:" + delta,finalSize, initSize + delta);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
|
||||
Delete delete = new Delete(row);
|
||||
|
Loading…
x
Reference in New Issue
Block a user