HBASE-16768 Inconsistent results from the Append/Increment (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-10-08 13:12:21 -07:00
parent bbaa0e851d
commit e043d450ed
6 changed files with 135 additions and 21 deletions

View File

@ -234,6 +234,15 @@ public class DefaultMemStore implements MemStore {
return internalAdd(toAdd, mslabUsed); return internalAdd(toAdd, mslabUsed);
} }
@Override
public long add(Iterable<Cell> cells) {
long size = 0;
for (Cell cell : cells) {
size += add(cell);
}
return size;
}
@Override @Override
public long timeOfOldestEdit() { public long timeOfOldestEdit() {
return timeOfOldestEdit; return timeOfOldestEdit;

View File

@ -3324,8 +3324,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
!= OperationStatusCode.NOT_RUN) { != OperationStatusCode.NOT_RUN) {
continue; continue;
} }
// We need to update the sequence id for following reasons.
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
// 2) If no WAL, FSWALEntry won't be used
boolean updateSeqId = isInReplay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
updateSequenceId(familyMaps[i].values(), mvccNum);
}
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); addedSize += applyFamilyMapToMemstore(familyMaps[i]);
} }
// ------------------------------- // -------------------------------
@ -3722,6 +3729,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
manifest.addRegion(this); manifest.addRegion(this);
} }
private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long sequenceId)
throws IOException {
for (List<Cell> cells : cellItr) {
if (cells == null) {
return;
}
for (Cell cell : cells) {
CellUtil.setSequenceId(cell, sequenceId);
}
}
}
@Override @Override
public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now) public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
throws IOException { throws IOException {
@ -3846,8 +3865,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* new entries. * new entries.
* @throws IOException * @throws IOException
*/ */
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap) throws IOException {
long mvccNum, boolean isInReplay) throws IOException {
long size = 0; long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@ -3855,14 +3873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<Cell> cells = e.getValue(); List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess; assert cells instanceof RandomAccess;
Store store = getStore(family); Store store = getStore(family);
int listSize = cells.size(); size += store.add(cells);
for (int i=0; i < listSize; i++) {
Cell cell = cells.get(i);
if (cell.getSequenceId() == 0 || isInReplay) {
CellUtil.setSequenceId(cell, mvccNum);
}
size += store.add(cell);
}
} }
return size; return size;
@ -7520,9 +7531,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
recordMutationWithoutWal(mutate.getFamilyCellMap()); recordMutationWithoutWal(mutate.getFamilyCellMap());
} }
} }
boolean updateSeqId = false;
if (walKey == null) { if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendEmptyEdit(this.wal); walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
} }
// Do a get on the write entry... this will block until sequenceid is assigned... w/o it, // Do a get on the write entry... this will block until sequenceid is assigned... w/o it,
// TestAtomicOperation fails. // TestAtomicOperation fails.
@ -7533,21 +7547,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
writeEntry.getWriteNumber()); writeEntry.getWriteNumber());
} }
if (updateSeqId) {
updateSequenceId(tempMemstore.values(), writeEntry.getWriteNumber());
}
// Actually write to Memstore now // Actually write to Memstore now
if (!tempMemstore.isEmpty()) { if (!tempMemstore.isEmpty()) {
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey(); Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) { if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1 // upsert if VERSIONS for this CF == 1
// Is this right? It immediately becomes visible? St.Ack 20150907
size += store.upsert(entry.getValue(), getSmallestReadPoint()); size += store.upsert(entry.getValue(), getSmallestReadPoint());
} else { } else {
// otherwise keep older versions around // otherwise keep older versions around
for (Cell cell: entry.getValue()) { size += store.add(entry.getValue());
// This stamping of sequenceid seems redundant; it is happening down in if (!entry.getValue().isEmpty()) {
// FSHLog when we consume edits off the ring buffer.
CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
size += store.add(cell);
doRollBackMemstore = true; doRollBackMemstore = true;
} }
} }
@ -7746,6 +7760,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
} }
boolean updateSeqId = false;
// Actually write to WAL now. If walEdits is non-empty, we write the WAL. // Actually write to WAL now. If walEdits is non-empty, we write the WAL.
if (walEdits != null && !walEdits.isEmpty()) { if (walEdits != null && !walEdits.isEmpty()) {
// Using default cluster id, as this can only happen in the originating cluster. // Using default cluster id, as this can only happen in the originating cluster.
@ -7759,6 +7774,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} else { } else {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
walKey = this.appendEmptyEdit(this.wal); walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
} }
// Get WriteEntry. Will wait on assign of the sequence id. // Get WriteEntry. Will wait on assign of the sequence id.
WriteEntry writeEntry = walKey.getWriteEntry(); WriteEntry writeEntry = walKey.getWriteEntry();
@ -7768,6 +7785,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
writeEntry.getWriteNumber()); writeEntry.getWriteNumber());
} }
if (updateSeqId) {
updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
}
// Now write to memstore, a family at a time. // Now write to memstore, a family at a time.
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
Store store = entry.getKey(); Store store = entry.getKey();
@ -7778,10 +7799,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// TODO: St.Ack 20151222 Why no rollback in this case? // TODO: St.Ack 20151222 Why no rollback in this case?
} else { } else {
// Otherwise keep older versions around // Otherwise keep older versions around
for (Cell cell: results) { accumulatedResultSize += store.add(entry.getValue());
// Why we need this? if (!entry.getValue().isEmpty()) {
CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
accumulatedResultSize += store.add(cell);
doRollBackMemstore = true; doRollBackMemstore = true;
} }
} }

View File

@ -665,6 +665,16 @@ public class HStore implements Store {
} }
} }
@Override
public long add(Iterable<Cell> cells) {
lock.readLock().lock();
try {
return this.memstore.add(cells);
} finally {
lock.readLock().unlock();
}
}
@Override @Override
public long timeOfOldestEdit() { public long timeOfOldestEdit() {
return memstore.timeOfOldestEdit(); return memstore.timeOfOldestEdit();

View File

@ -72,6 +72,13 @@ public interface MemStore extends HeapSize {
*/ */
long add(final Cell cell); long add(final Cell cell);
/**
* Write the updates
* @param cells
* @return approximate size of the passed cell.
*/
long add(Iterable<Cell> cells);
/** /**
* @return Oldest timestamp of all the Cells in the MemStore * @return Oldest timestamp of all the Cells in the MemStore
*/ */

View File

@ -150,6 +150,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/ */
long add(Cell cell); long add(Cell cell);
/**
* Adds the specified value to the memstore
* @param cells
* @return memstore size delta
*/
long add(Iterable<Cell> cells);
/** /**
* When was the last edit done in the memstore * When was the last edit done in the memstore
*/ */

View File

@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -4120,6 +4121,67 @@ public class TestFromClientSide {
assertEquals(true, ok); assertEquals(true, ok);
} }
private List<Result> doAppend(final boolean walUsed) throws IOException {
LOG.info("Starting testAppend, walUsed is " + walUsed);
final TableName TABLENAME = TableName.valueOf(walUsed ? "testAppendWithWAL" : "testAppendWithoutWAL");
Table t = TEST_UTIL.createTable(TABLENAME, FAMILY);
final byte[] row1 = Bytes.toBytes("c");
final byte[] row2 = Bytes.toBytes("b");
final byte[] row3 = Bytes.toBytes("a");
final byte[] qual = Bytes.toBytes("qual");
Put put_0 = new Put(row2);
put_0.addColumn(FAMILY, qual, Bytes.toBytes("put"));
Put put_1 = new Put(row3);
put_1.addColumn(FAMILY, qual, Bytes.toBytes("put"));
Append append_0 = new Append(row1);
append_0.add(FAMILY, qual, Bytes.toBytes("i"));
Append append_1 = new Append(row1);
append_1.add(FAMILY, qual, Bytes.toBytes("k"));
Append append_2 = new Append(row1);
append_2.add(FAMILY, qual, Bytes.toBytes("e"));
if (!walUsed) {
append_2.setDurability(Durability.SKIP_WAL);
}
Append append_3 = new Append(row1);
append_3.add(FAMILY, qual, Bytes.toBytes("a"));
Scan s = new Scan();
s.setCaching(1);
t.append(append_0);
t.put(put_0);
t.put(put_1);
List<Result> results = new LinkedList<>();
try (ResultScanner scanner = t.getScanner(s)) {
t.append(append_1);
t.append(append_2);
t.append(append_3);
for (Result r : scanner) {
results.add(r);
}
}
TEST_UTIL.deleteTable(TABLENAME);
return results;
}
@Test
public void testAppendWithoutWAL() throws Exception {
List<Result> resultsWithWal = doAppend(true);
List<Result> resultsWithoutWal = doAppend(false);
assertEquals(resultsWithWal.size(), resultsWithoutWal.size());
for (int i = 0; i != resultsWithWal.size(); ++i) {
Result resultWithWal = resultsWithWal.get(i);
Result resultWithoutWal = resultsWithoutWal.get(i);
assertEquals(resultWithWal.rawCells().length, resultWithoutWal.rawCells().length);
for (int j = 0; j != resultWithWal.rawCells().length; ++j) {
Cell cellWithWal = resultWithWal.rawCells()[j];
Cell cellWithoutWal = resultWithoutWal.rawCells()[j];
assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), CellUtil.cloneRow(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), CellUtil.cloneFamily(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), CellUtil.cloneQualifier(cellWithoutWal)));
assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), CellUtil.cloneValue(cellWithoutWal)));
}
}
}
/** /**
* test for HBASE-737 * test for HBASE-737
* @throws IOException * @throws IOException