HBASE-16768 Inconsistent results from the Append/Increment (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-10-07 00:59:27 -07:00
parent 2c7211ec4b
commit 96d34f2a79
6 changed files with 121 additions and 24 deletions

View File

@ -97,6 +97,15 @@ public abstract class AbstractMemStore implements MemStore {
*/
public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
@Override
public long add(Iterable<Cell> cells) {
long size = 0;
for (Cell cell : cells) {
size += add(cell);
}
return size;
}
/**
* Write an update
* @param cell the cell to be added

View File

@ -3256,8 +3256,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
}
addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,
// We need to update the sequence id for following reasons.
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
// 2) If no WAL, FSWALEntry won't be used
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
this.updateSequenceId(familyMaps[i].values(),
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
}
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
}
// STEP 6. Complete mvcc.
@ -3673,6 +3680,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long sequenceId)
throws IOException {
for (List<Cell> cells: cellItr) {
if (cells == null) return;
for (Cell cell : cells) {
CellUtil.setSequenceId(cell, sequenceId);
}
}
}
@Override
public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
throws IOException {
@ -3783,15 +3800,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param familyMap Map of Cells by family
* @return the additional memory usage of the memstore caused by the new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, boolean replay,
long sequenceId)
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap)
throws IOException {
long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
size += applyToMemstore(getStore(family), cells, false, replay, sequenceId);
size += applyToMemstore(getStore(family), cells, false);
}
return size;
}
@ -3803,34 +3819,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Memstore change in size on insert of these Cells.
* @see #applyToMemstore(Store, Cell, long)
*/
private long applyToMemstore(final Store store, final List<Cell> cells,
final boolean delta, boolean replay, long sequenceId)
private long applyToMemstore(final Store store, final List<Cell> cells, final boolean delta)
throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
long size = 0;
boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
int count = cells.size();
if (upsert) {
size += store.upsert(cells, getSmallestReadPoint());
return store.upsert(cells, getSmallestReadPoint());
} else {
for (int i = 0; i < count; i++) {
Cell cell = cells.get(i);
// TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack
// When is it zero anyways? When replay? Then just rely on that flag.
if (cell.getSequenceId() == 0 || replay) {
CellUtil.setSequenceId(cell, sequenceId);
}
size += store.add(cell);
}
return store.add(cells);
}
return size;
}
/**
* @return Memstore change in size on insert of these Cells.
* @see #applyToMemstore(Store, List, boolean, boolean, long)
*/
private long applyToMemstore(final Store store, final Cell cell, long sequenceId)
private long applyToMemstore(final Store store, final Cell cell)
throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
if (store == null) {
@ -7045,7 +7049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
CellUtil.setSequenceId(cell, sequenceId);
}
Store store = getStore(cell);
addedSize += applyToMemstore(store, cell, sequenceId);
addedSize += applyToMemstore(store, cell);
}
}
// STEP 8. Complete mvcc.
@ -7231,12 +7235,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// transaction.
recordMutationWithoutWal(mutation.getFamilyCellMap());
writeEntry = mvcc.begin();
updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
}
// Now write to MemStore. Do it a column family at a time.
long sequenceId = writeEntry.getWriteNumber();
for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {
accumulatedResultSize +=
applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId);
accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), true);
}
mvcc.completeAndWait(writeEntry);
if (rsServices != null && rsServices.getNonceManager() != null) {

View File

@ -634,6 +634,16 @@ public class HStore implements Store {
}
}
@Override
public long add(final Iterable<Cell> cells) {
lock.readLock().lock();
try {
return memstore.add(cells);
} finally {
lock.readLock().unlock();
}
}
@Override
public long timeOfOldestEdit() {
return memstore.timeOfOldestEdit();

View File

@ -73,6 +73,13 @@ public interface MemStore extends HeapSize {
*/
long add(final Cell cell);
/**
* Write the updates
* @param cells
* @return approximate size of the passed cell.
*/
long add(Iterable<Cell> cells);
/**
* @return Oldest timestamp of all the Cells in the MemStore
*/

View File

@ -150,6 +150,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
long add(Cell cell);
/**
* Adds the specified value to the memstore
* @param cells
* @return memstore size delta
*/
long add(Iterable<Cell> cells);
/**
* When was the last edit done in the memstore
*/

View File

@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -4477,6 +4478,66 @@ public class TestFromClientSide {
assertEquals(r.getColumnLatestCell(FAMILY, QUALIFIERS[0]).getTimestamp(),
r.getColumnLatestCell(FAMILY, QUALIFIERS[2]).getTimestamp());
}
private List<Result> doAppend(final boolean walUsed) throws IOException {
LOG.info("Starting testAppend, walUsed is " + walUsed);
final TableName TABLENAME = TableName.valueOf(walUsed ? "testAppendWithWAL" : "testAppendWithoutWAL");
Table t = TEST_UTIL.createTable(TABLENAME, FAMILY);
final byte[] row1 = Bytes.toBytes("c");
final byte[] row2 = Bytes.toBytes("b");
final byte[] row3 = Bytes.toBytes("a");
final byte[] qual = Bytes.toBytes("qual");
Put put_0 = new Put(row2);
put_0.addColumn(FAMILY, qual, Bytes.toBytes("put"));
Put put_1 = new Put(row3);
put_1.addColumn(FAMILY, qual, Bytes.toBytes("put"));
Append append_0 = new Append(row1);
append_0.add(FAMILY, qual, Bytes.toBytes("i"));
Append append_1 = new Append(row1);
append_1.add(FAMILY, qual, Bytes.toBytes("k"));
Append append_2 = new Append(row1);
append_2.add(FAMILY, qual, Bytes.toBytes("e"));
if (!walUsed) {
append_2.setDurability(Durability.SKIP_WAL);
}
Append append_3 = new Append(row1);
append_3.add(FAMILY, qual, Bytes.toBytes("a"));
Scan s = new Scan();
s.setCaching(1);
t.append(append_0);
t.put(put_0);
t.put(put_1);
List<Result> results = new LinkedList<>();
try (ResultScanner scanner = t.getScanner(s)) {
t.append(append_1);
t.append(append_2);
t.append(append_3);
for (Result r : scanner) {
results.add(r);
}
}
TEST_UTIL.deleteTable(TABLENAME);
return results;
}
@Test
public void testAppendWithoutWAL() throws Exception {
List<Result> resultsWithWal = doAppend(true);
List<Result> resultsWithoutWal = doAppend(false);
assertEquals(resultsWithWal.size(), resultsWithoutWal.size());
for (int i = 0; i != resultsWithWal.size(); ++i) {
Result resultWithWal = resultsWithWal.get(i);
Result resultWithoutWal = resultsWithoutWal.get(i);
assertEquals(resultWithWal.rawCells().length, resultWithoutWal.rawCells().length);
for (int j = 0; j != resultWithWal.rawCells().length; ++j) {
Cell cellWithWal = resultWithWal.rawCells()[j];
Cell cellWithoutWal = resultWithoutWal.rawCells()[j];
assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), CellUtil.cloneRow(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), CellUtil.cloneFamily(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), CellUtil.cloneQualifier(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), CellUtil.cloneValue(cellWithoutWal)));
}
}
}
@Test
public void testClientPoolRoundRobin() throws IOException {