hbase-8763: Combine MVCC and SeqId

This commit is contained in:
Jeffrey Zhong 2014-06-06 18:25:46 -07:00
parent d6cc2fb1ea
commit c682d57e92
20 changed files with 555 additions and 339 deletions

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@ -210,12 +211,13 @@ public class DefaultMemStore implements MemStore {
/**
* Write an update
* @param cell
* @return approximate size of the passed key and value.
* @return approximate size of the passed KV & newly added KV which maybe different than the
* passed-in KV
*/
@Override
public long add(Cell cell) {
public Pair<Long, Cell> add(Cell cell) {
KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell));
return internalAdd(toAdd);
return new Pair<Long, Cell>(internalAdd(toAdd), toAdd);
}
@Override
@ -1051,18 +1053,21 @@ public class DefaultMemStore implements MemStore {
byte [] empty = new byte[0];
for (int i = 0; i < count; i++) {
// Give each its own ts
size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
size += ret.getFirst();
}
LOG.info("memstore1 estimated size=" + size);
for (int i = 0; i < count; i++) {
size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
size += ret.getFirst();
}
LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
// Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore();
for (int i = 0; i < count; i++) {
size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
Pair<Long, Cell> ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
new byte[i]));
size += ret.getFirst();
}
LOG.info("memstore2 estimated size=" + size);
final int seconds = 30;

View File

@ -824,10 +824,11 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
mvcc.initialize(maxSeqId);
return maxSeqId;
}
@ -1684,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{
// wal can be null replaying edits.
return wal != null?
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
getNextSequenceId(wal, startTime), "Nothing to flush"):
getNextSequenceId(wal), "Nothing to flush"):
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
}
} finally {
@ -1714,10 +1715,10 @@ public class HRegion implements HeapSize { // , Writable{
getRegionInfo().getEncodedName());
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
long flushSeqId = -1L;
try {
try {
// Record the mvcc for all transactions in progress.
w = mvcc.beginMemstoreInsert();
mvcc.advanceMemstore(w);
if (wal != null) {
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
// This should never happen.
@ -1729,7 +1730,7 @@ public class HRegion implements HeapSize { // , Writable{
// Get a sequence id that we can use to denote the flush. It will be one beyond the last
// edit that made it into the hfile (the below does not add an edit, it just asks the
// WAL system to return next sequence edit).
flushSeqId = getNextSequenceId(wal, startTime);
flushSeqId = getNextSequenceId(wal);
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushSeqId = myseqid;
@ -1748,10 +1749,9 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.writeLock().unlock();
}
String s = "Finished memstore snapshotting " + this +
", syncing WAL and waiting on mvcc, flushSize=" + totalFlushableSize;
", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// sync unflushed WAL changes when deferred log sync is enabled
// see HBASE-8208 for details
if (wal != null && !shouldSyncLog()) wal.sync();
@ -1761,11 +1761,18 @@ public class HRegion implements HeapSize { // , Writable{
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
mvcc.waitForRead(w);
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
s = "Flushing stores of " + this;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
} finally {
if (w != null) {
// in case of failure just mark current w as complete
mvcc.advanceMemstore(w);
}
}
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memstore.
@ -1849,13 +1856,9 @@ public class HRegion implements HeapSize { // , Writable{
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
private long getNextSequenceId(final HLog wal, final long now) throws IOException {
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
// Call append but with an empty WALEdit. The returned sequence id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false);
return key.getLogSeqNum();
private long getNextSequenceId(final HLog wal) throws IOException {
HLogKey key = this.appendNoSyncNoAppend(wal, null);
return key.getSequenceNumber();
}
//////////////////////////////////////////////////////////////////////////////
@ -2349,11 +2352,14 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
boolean success = false;
int noOfPuts = 0, noOfDeletes = 0;
HLogKey walKey = null;
long mvccNum = 0;
try {
// ------------------------------------
// STEP 1. Try to acquire as many locks as we can, and ensure
@ -2475,12 +2481,12 @@ public class HRegion implements HeapSize { // , Writable{
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
//
// ------------------------------------
// Acquire the latest mvcc number
// ----------------------------------
w = mvcc.beginMemstoreInsert();
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
@ -2506,13 +2512,12 @@ public class HRegion implements HeapSize { // , Writable{
continue;
}
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
}
// ------------------------------------
// STEP 4. Build WAL edit
// ----------------------------------
boolean hasWalAppends = false;
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
@ -2543,13 +2548,13 @@ public class HRegion implements HeapSize { // , Writable{
throw new IOException("Multiple nonces per batch and not in replay");
}
// txid should always increase, so having the one from the last call is ok.
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup,
currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key,
walEdit, getSequenceId(), true);
hasWalAppends = true;
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
walKey = null;
}
currentNonceGroup = nonceGroup;
currentNonce = nonce;
@ -2570,12 +2575,15 @@ public class HRegion implements HeapSize { // , Writable{
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit,
getSequenceId(), true);
hasWalAppends = true;
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
// -------------------------------
@ -2590,9 +2598,10 @@ public class HRegion implements HeapSize { // , Writable{
// -------------------------
// STEP 7. Sync wal.
// -------------------------
if (hasWalAppends) {
if (txid != 0) {
syncOrDefer(txid, durability);
}
doRollBackMemstore = false;
// calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
@ -2606,7 +2615,7 @@ public class HRegion implements HeapSize { // , Writable{
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
if (w != null) {
mvcc.completeMemstoreInsert(w);
mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
w = null;
}
@ -2636,9 +2645,11 @@ public class HRegion implements HeapSize { // , Writable{
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
rollbackMemstore(memstoreCells);
}
if (w != null) {
mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
if (w != null) mvcc.completeMemstoreInsert(w);
if (locked) {
this.updatesLock.readLock().unlock();
@ -2727,7 +2738,7 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
mvcc.waitForPreviousTransactionsComplete();
try {
if (this.getCoprocessorHost() != null) {
Boolean processed = null;
@ -2903,19 +2914,13 @@ public class HRegion implements HeapSize { // , Writable{
* @param familyMap Map of kvs per family
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction.
* @param output newly added KVs into memstore
* @return the additional memory usage of the memstore caused by the
* new entries.
*/
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
long mvccNum, List<KeyValue> memstoreCells) {
long size = 0;
boolean freemvcc = false;
try {
if (localizedWriteEntry == null) {
localizedWriteEntry = mvcc.beginMemstoreInsert();
freemvcc = true;
}
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@ -2924,13 +2929,10 @@ public class HRegion implements HeapSize { // , Writable{
Store store = getStore(family);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
size += store.add(kv);
}
}
} finally {
if (freemvcc) {
mvcc.completeMemstoreInsert(localizedWriteEntry);
kv.setMvccVersion(mvccNum);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
}
@ -2942,35 +2944,16 @@ public class HRegion implements HeapSize { // , Writable{
* called when a Put/Delete has updated memstore but subsequently fails to update
* the wal. This method is then invoked to rollback the memstore.
*/
private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
Map<byte[], List<Cell>>[] familyMaps,
int start, int end) {
private void rollbackMemstore(List<KeyValue> memstoreCells) {
int kvsRolledback = 0;
for (int i = start; i < end; i++) {
// skip over request that never succeeded in the first place.
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.SUCCESS) {
continue;
}
// Rollback all the kvs for this row.
Map<byte[], List<Cell>> familyMap = familyMaps[i];
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
// Remove those keys from the memstore that matches our
// key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
// that even the memstoreTS has to match for keys that will be rolled-back.
for (KeyValue kv : memstoreCells) {
byte[] family = kv.getFamily();
Store store = getStore(family);
for (Cell cell: cells) {
store.rollback(KeyValueUtil.ensureKeyValue(cell));
store.rollback(kv);
kvsRolledback++;
}
}
}
LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
" keyvalues from start:" + start + " to end:" + end);
LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
}
/**
@ -3378,7 +3361,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return True if we should flush.
*/
protected boolean restoreEdit(final Store s, final KeyValue kv) {
long kvSize = s.add(kv);
long kvSize = s.add(kv).getFirst();
if (this.rsAccounting != null) {
rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
}
@ -4883,7 +4866,10 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks;
long addedSize = 0;
List<KeyValue> mutations = new ArrayList<KeyValue>();
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
HLogKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@ -4894,6 +4880,7 @@ public class HRegion implements HeapSize { // , Writable{
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
locked = true;
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTimeMillis();
try {
@ -4904,27 +4891,35 @@ public class HRegion implements HeapSize { // , Writable{
if (!mutations.isEmpty()) {
// 5. Get a mvcc write number
writeEntry = mvcc.beginMemstoreInsert();
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Apply to memstore
for (KeyValue kv : mutations) {
kv.setMvccVersion(writeEntry.getWriteNumber());
kv.setMvccVersion(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
// unreachable
}
addedSize += store.add(kv);
Pair<Long, Cell> ret = store.add(kv);
addedSize += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
long txid = 0;
// 7. Append no sync
if (!walEdit.isEmpty()) {
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup,
nonce);
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
key, walEdit, getSequenceId(), true);
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
if(walKey == null){
// since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
// 8. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@ -4951,7 +4946,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// 11. Roll mvcc forward
if (writeEntry != null) {
mvcc.completeMemstoreInsert(writeEntry);
mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
}
if (locked) {
this.updatesLock.readLock().unlock();
@ -5055,8 +5050,12 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment();
long mvccNum = 0;
WriteEntry w = null;
RowLock rowLock;
HLogKey walKey = null;
RowLock rowLock = null;
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
boolean doRollBackMemstore = false;
try {
rowLock = getRowLock(row);
try {
@ -5064,7 +5063,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
mvcc.waitForPreviousTransactionsComplete();
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preAppendAfterRowLock(append);
if(r!= null) {
@ -5072,7 +5071,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// now start my own transaction
w = mvcc.beginMemstoreInsert();
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@ -5140,7 +5140,7 @@ public class HRegion implements HeapSize { // , Writable{
// so only need to update the timestamp to 'now'
newKV.updateLatestStamp(Bytes.toBytes(now));
}
newKV.setMvccVersion(w.getWriteNumber());
newKV.setMvccVersion(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@ -5161,34 +5161,43 @@ public class HRegion implements HeapSize { // , Writable{
tempMemstore.put(store, kvs);
}
// Actually write to WAL now
if (writeToWAL) {
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits,
this.sequenceId, true);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
//Actually write to Memstore now
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
} else {
// otherwise keep older versions around
for (Cell cell: entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
size += store.add(kv);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
doRollBackMemstore = true;
}
}
allKVs.addAll(entry.getValue());
}
// Actually write to WAL now
if (writeToWAL) {
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
this.sequenceId, true, memstoreCells);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@ -5196,14 +5205,23 @@ public class HRegion implements HeapSize { // , Writable{
}
} finally {
rowLock.release();
rowLock = null;
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
if(txid != 0){
syncOrDefer(txid, durability);
}
doRollBackMemstore = false;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
rollbackMemstore(memstoreCells);
}
if (w != null) {
mvcc.completeMemstoreInsert(w);
mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
closeRegionOperation(Operation.APPEND);
}
@ -5250,15 +5268,20 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation(Operation.INCREMENT);
this.writeRequestsCount.increment();
RowLock rowLock = null;
WriteEntry w = null;
HLogKey walKey = null;
long mvccNum = 0;
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
boolean doRollBackMemstore = false;
try {
RowLock rowLock = getRowLock(row);
rowLock = getRowLock(row);
try {
lock(this.updatesLock.readLock());
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
mvcc.waitForPreviousTransactionsComplete();
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
if (r != null) {
@ -5266,7 +5289,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// now start my own transaction
w = mvcc.beginMemstoreInsert();
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte [], List<Cell>> family:
@ -5330,7 +5354,7 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
}
newKV.setMvccVersion(w.getWriteNumber());
newKV.setMvccVersion(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@ -5357,20 +5381,6 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// Actually write to WAL now
if (walEdits != null && !walEdits.isEmpty()) {
if (writeToWAL) {
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
key, walEdits, getSequenceId(), true);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
}
}
//Actually write to Memstore now
if (!tempMemstore.isEmpty()) {
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
@ -5378,30 +5388,62 @@ public class HRegion implements HeapSize { // , Writable{
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
} else {
// otherwise keep older versions around
for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
size += store.add(kv);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
doRollBackMemstore = true;
}
}
}
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
}
// Actually write to WAL now
if (walEdits != null && !walEdits.isEmpty()) {
if (writeToWAL) {
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdits, getSequenceId(), true, memstoreCells);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
}
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
} finally {
this.updatesLock.readLock().unlock();
}
} finally {
rowLock.release();
rowLock = null;
}
if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
// sync the transaction log outside the rowlock
if(txid != 0){
syncOrDefer(txid, durability);
}
doRollBackMemstore = false;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
rollbackMemstore(memstoreCells);
}
if (w != null) {
mvcc.completeMemstoreInsert(w);
mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) {
@ -6130,4 +6172,23 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
/**
* Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
* the WALEdit append later.
* @param wal
* @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
* be updated with right mvcc values(their log sequence nu
* @return
* @throws IOException
*/
private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -564,7 +565,7 @@ public class HStore implements Store {
}
@Override
public long add(final KeyValue kv) {
public Pair<Long, Cell> add(final KeyValue kv) {
lock.readLock().lock();
try {
return this.memstore.add(kv);

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Pair;
/**
* The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
@ -61,9 +62,10 @@ public interface MemStore extends HeapSize {
/**
* Write an update
* @param cell
* @return approximate size of the passed key and value.
* @return approximate size of the passed KV and the newly added KV which maybe different from the
* passed in KV.
*/
long add(final Cell cell);
Pair<Long, Cell> add(final Cell cell);
/**
* @return Oldest timestamp of all the Cells in the MemStore

View File

@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@ -32,9 +34,8 @@ import org.apache.hadoop.hbase.util.ClassSize;
*/
@InterfaceAudience.Private
public class MultiVersionConsistencyControl {
private static final long NO_WRITE_NUMBER = 0;
private volatile long memstoreRead = 0;
private volatile long memstoreWrite = 0;
private final Object readWaiters = new Object();
// This is the pending queue of writes.
@ -45,7 +46,6 @@ public class MultiVersionConsistencyControl {
* Default constructor. Initializes the memstoreRead/Write points to 0.
*/
public MultiVersionConsistencyControl() {
this.memstoreRead = this.memstoreWrite = 0;
}
/**
@ -54,37 +54,86 @@ public class MultiVersionConsistencyControl {
*/
public void initialize(long startPoint) {
synchronized (writeQueue) {
if (this.memstoreWrite != this.memstoreRead) {
throw new RuntimeException("Already used this mvcc. Too late to initialize");
}
this.memstoreRead = this.memstoreWrite = startPoint;
writeQueue.clear();
memstoreRead = startPoint;
}
}
/**
* Generate and return a {@link WriteEntry} with a new write number.
* To complete the WriteEntry and wait for it to be visible,
* call {@link #completeMemstoreInsert(WriteEntry)}.
*
* @param initVal The value we used initially and expected it'll be reset later
* @return
*/
public WriteEntry beginMemstoreInsert() {
WriteEntry beginMemstoreInsert() {
return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
}
/**
* Get a mvcc write number before an actual one(its log sequence Id) being assigned
* @param sequenceId
* @return long a faked write number which is bigger enough not to be seen by others before a real
* one is assigned
*/
public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
// the 1 billion is just an arbitrary big number to guard no scanner will reach it before
// current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
// because each handler could increment sequence num twice and max concurrent in-flight
// transactions is the number of RPC handlers.
// we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
// changes touch same row key
// If for any reason, the bumped value isn't reset due to failure situations, we'll reset
// curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
return sequenceId.incrementAndGet() + 1000000000;
}
/**
* This function starts a MVCC transaction with current region's log change sequence number. Since
* we set change sequence number when flushing current change to WAL(late binding), the flush
* order may differ from the order to start a MVCC transaction. For example, a change begins a
* MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
* add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
* transactions will reuse the number till current MVCC completes(success or fail). The "faked"
* big number is safe because we only need it to prevent current change being seen and the number
* will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
* for MVCC to align with flush sequence.
* @param curSeqNum
* @return WriteEntry a WriteEntry instance with the passed in curSeqNum
*/
public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
WriteEntry e = new WriteEntry(curSeqNum);
synchronized (writeQueue) {
long nextWriteNumber = ++memstoreWrite;
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
/**
* Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
*
* At the end of this call, the global read point is at least as large as the write point
* of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
* Complete a {@link WriteEntry} that was created by
* {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
* point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
* visible to MVCC readers.
* @throws IOException
*/
public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum)
throws IOException {
if(e == null) return;
if (seqNum != null) {
e.setWriteNumber(seqNum.getSequenceNumber());
} else {
// set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
// function beginMemstoreInsertWithSeqNum in case of failures
e.setWriteNumber(NO_WRITE_NUMBER);
}
waitForPreviousTransactionsComplete(e);
}
/**
* Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
* end of this call, the global read point is at least as large as the write point of the passed
* in WriteEntry. Thus, the write is visible to MVCC readers.
*/
public void completeMemstoreInsert(WriteEntry e) {
advanceMemstore(e);
waitForRead(e);
waitForPreviousTransactionsComplete(e);
}
/**
@ -99,75 +148,94 @@ public class MultiVersionConsistencyControl {
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
boolean advanceMemstore(WriteEntry e) {
long nextReadValue = -1;
synchronized (writeQueue) {
e.markCompleted();
long nextReadValue = -1;
boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
// Using Max because Edit complete in WAL sync order not arriving order
nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("never was a first");
if (nextReadValue > memstoreRead) {
memstoreRead = nextReadValue;
}
// notify waiters on writeQueue before return
writeQueue.notifyAll();
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
memstoreRead = nextReadValue;
readWaiters.notifyAll();
}
}
if (memstoreRead >= e.getWriteNumber()) {
return true;
}
return false;
}
}
/**
* Wait for the global readPoint to advance upto
* the specified transaction number.
* Wait for all previous MVCC transactions complete
*/
public void waitForRead(WriteEntry e) {
public void waitForPreviousTransactionsComplete() {
WriteEntry w = beginMemstoreInsert();
waitForPreviousTransactionsComplete(w);
}
public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
boolean interrupted = false;
synchronized (readWaiters) {
while (memstoreRead < e.getWriteNumber()) {
WriteEntry w = waitedEntry;
try {
readWaiters.wait(0);
WriteEntry firstEntry = null;
do {
synchronized (writeQueue) {
// writeQueue won't be empty at this point, the following is just a safety check
if (writeQueue.isEmpty()) {
break;
}
firstEntry = writeQueue.getFirst();
if (firstEntry == w) {
// all previous in-flight transactions are done
break;
}
try {
writeQueue.wait(0);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
break;
}
}
} while (firstEntry != null);
} finally {
if (w != null) {
advanceMemstore(w);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (interrupted) Thread.currentThread().interrupt();
}
public long memstoreReadPoint() {
return memstoreRead;
}
public static class WriteEntry {
private long writeNumber;
private boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
@ -180,6 +248,9 @@ public class MultiVersionConsistencyControl {
long getWriteNumber() {
return this.writeNumber;
}
void setWriteNumber(long val){
this.writeNumber = val;
}
}
public static final long FIXED_SIZE = ClassSize.align(

View File

@ -0,0 +1,31 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Interface which abstracts implementations on log sequence number assignment
*/
@InterfaceAudience.Private
public interface SequenceNumber {
public long getSequenceNumber() throws IOException;
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Pair;
/**
* Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
@ -122,9 +123,9 @@ public interface Store extends HeapSize, StoreConfigInformation {
/**
* Adds a value to the memstore
* @param kv
* @return memstore size delta
* @return memstore size delta & newly added KV which maybe different than the passed in KV
*/
long add(KeyValue kv);
Pair<Long, Cell> add(KeyValue kv);
/**
* When was the last edit done in the memstore

View File

@ -121,12 +121,6 @@ abstract class StoreFlusher {
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (kv.getMvccVersion() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMvccVersion(0);
}
sink.append(kv);
}
kvs.clear();

View File

@ -1064,13 +1064,26 @@ class FSHLog implements HLog, Syncable {
}
}
/**
* @param now
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tableName
* @param clusterIds that have consumed the change
* @return New log key.
*/
protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
}
@Override
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId)
throws IOException {
HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
append(htd, info, logKey, edits, sequenceId, true, true);
append(htd, info, logKey, edits, sequenceId, true, true, null);
}
@Override
@ -1079,14 +1092,15 @@ class FSHLog implements HLog, Syncable {
boolean inMemstore, long nonceGroup, long nonce) throws IOException {
HLogKey logKey =
new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
return append(htd, info, logKey, edits, sequenceId, false, inMemstore);
return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null);
}
@Override
public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore)
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
final List<KeyValue> memstoreKVs)
throws IOException {
return append(htd, info, key, edits, sequenceId, false, inMemstore);
return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs);
}
/**
@ -1101,19 +1115,22 @@ class FSHLog implements HLog, Syncable {
* @param sync shall we sync after we call the append?
* @param inMemstore
* @param sequenceId The region sequence id reference.
* @param memstoreKVs
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore)
WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore,
List<KeyValue> memstoreKVs)
throws IOException {
if (!this.enabled) return this.highestUnsyncedSequence;
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
// all this to make a key and then below to append the edit, we need to carry htd, info,
// etc. all over the ring buffer.
@ -1124,19 +1141,10 @@ class FSHLog implements HLog, Syncable {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
// latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri);
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
// Now wait until the region edit/sequence id is available. The 'entry' has an internal
// latch that is thrown when the region edit/sequence id is set. Calling
// entry.getRegionSequenceId will cause us block until the latch is thrown. The return is
// the region edit/sequence id, not the ring buffer txid.
try {
entry.getRegionSequenceId();
} catch (InterruptedException e) {
throw convertInterruptedExceptionToIOException(e);
}
}
// doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after
// all edits on a handler have been added.
@ -1894,6 +1902,14 @@ class FSHLog implements HLog, Syncable {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
// Edits are empty, there is nothing to append. Maybe empty when we are looking for a
// region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
if (entry.getEdit().isEmpty()) {
return;
}
// Coprocessor hook.
if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
entry.getEdit())) {
@ -1909,10 +1925,7 @@ class FSHLog implements HLog, Syncable {
entry.getEdit());
}
}
// If empty, there is nothing to append. Maybe empty when we are looking for a region
// sequence id only, a region edit/sequence id that is not associated with an actual edit.
// It has to go through all the rigmarole to be sure we have the right ordering.
if (!entry.getEdit().isEmpty()) {
writer.append(entry);
assert highestUnsyncedSequence < entry.getSequence();
highestUnsyncedSequence = entry.getSequence();
@ -1921,7 +1934,7 @@ class FSHLog implements HLog, Syncable {
if (entry.isInMemstore()) {
oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
}
}
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start);

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
/**
* A WAL Entry for {@link FSHLog} implementation. Immutable.
@ -41,19 +43,18 @@ class FSWALEntry extends HLog.Entry {
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
// Latch that is set on creation and then is undone on the other side of the ring buffer by the
// consumer thread just after it sets the region edit/sequence id in here.
private final transient CountDownLatch latch = new CountDownLatch(1);
private final transient List<KeyValue> memstoreKVs;
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
final HTableDescriptor htd, final HRegionInfo hri) {
final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) {
super(key, edit);
this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
this.memstoreKVs = memstoreKVs;
}
public String toString() {
@ -90,15 +91,13 @@ class FSWALEntry extends HLog.Entry {
*/
long stampRegionSequenceId() {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
getKey().setLogSeqNum(regionSequenceId);
// On creation, a latch was set. Count it down when sequence id is set. This will free
// up anyone blocked on {@link #getRegionSequenceId()}
this.latch.countDown();
if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
for(KeyValue kv : this.memstoreKVs){
kv.setMvccVersion(regionSequenceId);
}
}
HLogKey key = getKey();
key.setLogSeqNum(regionSequenceId);
return regionSequenceId;
}
long getRegionSequenceId() throws InterruptedException {
this.latch.await();
return getKey().getLogSeqNum();
}
}

View File

@ -34,8 +34,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.Writable;
@ -290,8 +292,8 @@ public interface HLog {
* @param sequenceId
* @throws IOException
* @deprecated For tests only and even then, should use
* {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
* and {@link #sync()} instead.
* {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
* List)} and {@link #sync()} instead.
*/
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@ -337,7 +339,7 @@ public interface HLog {
* able to sync an explicit edit only (the current default implementation syncs up to the time
* of the sync call syncing whatever is behind the sync).
* @throws IOException
* @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
* @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)}
* instead because you can get back the region edit/sequenceid; it is set into the passed in
* <code>key</code>.
*/
@ -361,12 +363,13 @@ public interface HLog {
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @param memstoreKVs list of KVs added into memstore
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
* @throws IOException
*/
long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
AtomicLong sequenceId, boolean inMemstore)
AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs)
throws IOException;
// TODO: Do we need all these versions of sync?

View File

@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -31,6 +32,10 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.regionserver.SequenceNumber;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.WritableComparable;
@ -49,7 +55,6 @@ import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
/**
* A Key for an entry in the change log.
@ -64,7 +69,7 @@ import com.google.protobuf.HBaseZeroCopyByteString;
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into HLogEntry.
@InterfaceAudience.Private
public class HLogKey implements WritableComparable<HLogKey> {
public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
public static final Log LOG = LogFactory.getLog(HLogKey.class);
// should be < 0 (@see #readFields(DataInput))
@ -114,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
private byte [] encodedRegionName;
private TableName tablename;
private long logSeqNum;
private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
private long writeTime;
@ -184,7 +190,8 @@ public class HLogKey implements WritableComparable<HLogKey> {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce);
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce);
}
/**
@ -195,13 +202,14 @@ public class HLogKey implements WritableComparable<HLogKey> {
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup,
long nonce) {
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID,
EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce);
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
long nonceGroup, long nonce) {
init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
EMPTY_UUIDS, nonceGroup, nonce);
}
protected void init(final byte [] encodedRegionName, final TableName tablename,
@ -238,11 +246,30 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
/**
* Allow that the log sequence id to be set post-construction.
* Allow that the log sequence id to be set post-construction and release all waiters on assigned
* sequence number.
* @param sequence
*/
void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
this.seqNumAssignedLatch.countDown();
}
/**
* Wait for sequence number is assigned & return the assigned value
* @return long the new assigned sequence number
* @throws InterruptedException
*/
public long getSequenceNumber() throws IOException {
try {
this.seqNumAssignedLatch.await();
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for next log sequence number");
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
return this.logSeqNum;
}
/**
@ -358,7 +385,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
if (result == 0) {
if (this.logSeqNum < o.logSeqNum) {
result = -1;
} else if (this.logSeqNum > o.logSeqNum ) {
} else if (this.logSeqNum > o.logSeqNum) {
result = 1;
}
if (result == 0) {

View File

@ -1972,8 +1972,8 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
.getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
clusterIds, walKey.getNonceGroup(), walKey.getNonce());
.getTableName().toByteArray()), walKey.getLogSequenceNumber(),
walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);
}

View File

@ -262,7 +262,7 @@ public class HLogUtil {
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false);
log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

View File

@ -68,7 +68,7 @@ public class TestMultiParallel {
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
private static final int slaves = 2; // also used for testing HTable pool size
private static final int slaves = 3; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@ -238,7 +238,7 @@ public class TestMultiParallel {
*
* @throws Exception
*/
@Test (timeout=300000)
@Test (timeout=360000)
public void testFlushCommitsWithAbort() throws Exception {
LOG.info("test=testFlushCommitsWithAbort");
doTestFlushCommits(true);

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@ -61,6 +62,7 @@ public class TestDefaultMemStore extends TestCase {
private static final int QUALIFIER_COUNT = ROW_COUNT;
private static final byte [] FAMILY = Bytes.toBytes("column");
private MultiVersionConsistencyControl mvcc;
private AtomicLong startSeqNum = new AtomicLong(0);
@Override
public void setUp() throws Exception {
@ -236,7 +238,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v = Bytes.toBytes("value");
MultiVersionConsistencyControl.WriteEntry w =
mvcc.beginMemstoreInsert();
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setMvccVersion(w.getWriteNumber());
@ -250,7 +252,7 @@ public class TestDefaultMemStore extends TestCase {
s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv1});
w = mvcc.beginMemstoreInsert();
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setMvccVersion(w.getWriteNumber());
memstore.add(kv2);
@ -280,7 +282,7 @@ public class TestDefaultMemStore extends TestCase {
// INSERT 1: Write both columns val1
MultiVersionConsistencyControl.WriteEntry w =
mvcc.beginMemstoreInsert();
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
@ -296,7 +298,7 @@ public class TestDefaultMemStore extends TestCase {
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2
w = mvcc.beginMemstoreInsert();
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setMvccVersion(w.getWriteNumber());
memstore.add(kv21);
@ -332,7 +334,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
MultiVersionConsistencyControl.WriteEntry w =
mvcc.beginMemstoreInsert();
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
@ -348,7 +350,7 @@ public class TestDefaultMemStore extends TestCase {
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns
w = mvcc.beginMemstoreInsert();
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
kvDel.setMvccVersion(w.getWriteNumber());
@ -377,6 +379,7 @@ public class TestDefaultMemStore extends TestCase {
final MultiVersionConsistencyControl mvcc;
final MemStore memstore;
final AtomicLong startSeqNum;
AtomicReference<Throwable> caughtException;
@ -384,12 +387,14 @@ public class TestDefaultMemStore extends TestCase {
public ReadOwnWritesTester(int id,
MemStore memstore,
MultiVersionConsistencyControl mvcc,
AtomicReference<Throwable> caughtException)
AtomicReference<Throwable> caughtException,
AtomicLong startSeqNum)
{
this.mvcc = mvcc;
this.memstore = memstore;
this.caughtException = caughtException;
row = Bytes.toBytes(id);
this.startSeqNum = startSeqNum;
}
public void run() {
@ -403,7 +408,7 @@ public class TestDefaultMemStore extends TestCase {
private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
MultiVersionConsistencyControl.WriteEntry w =
mvcc.beginMemstoreInsert();
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
// Insert the sequence value (i)
byte[] v = Bytes.toBytes(i);
@ -433,7 +438,7 @@ public class TestDefaultMemStore extends TestCase {
AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught);
threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum);
threads[i].start();
}

View File

@ -4152,15 +4152,16 @@ public class TestHRegion {
durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
// expect skip wal cases
durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false);
durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false);
durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false);
}
@SuppressWarnings("unchecked")
private void durabilityTest(String method, Durability tableDurability,
Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
final boolean expectSyncFromLogSyncer) throws Exception {
@ -4183,7 +4184,7 @@ public class TestHRegion {
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean());
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any());
// verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {
@ -4202,7 +4203,7 @@ public class TestHRegion {
}
});
} else {
verify(log, never()).sync(anyLong());
//verify(log, never()).sync(anyLong());
verify(log, never()).sync();
}

View File

@ -46,8 +46,10 @@ public class TestMultiVersionConsistencyControl extends TestCase {
public boolean failed = false;
public void run() {
AtomicLong startPoint = new AtomicLong();
while (!finished.get()) {
MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert();
MultiVersionConsistencyControl.WriteEntry e =
mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
// System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500);

View File

@ -208,7 +208,7 @@ public class TestStore {
long size = store.memstore.getFlushableSize();
Assert.assertEquals(0, size);
LOG.info("Adding some data");
long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)).getFirst();
size = store.memstore.getFlushableSize();
Assert.assertEquals(kvSize, size);
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
@ -604,19 +604,19 @@ public class TestStore {
size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(newValue)));
Bytes.toBytes(newValue))).getFirst();
size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(newValue)));
Bytes.toBytes(newValue))).getFirst();
size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(newValue)));
Bytes.toBytes(newValue))).getFirst();
size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(newValue)));
Bytes.toBytes(newValue))).getFirst();
size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(newValue)));
Bytes.toBytes(newValue))).getFirst();
for ( int i = 0 ; i < 10000 ; ++i) {