HBASE-17471 Region Seqid will be out of order in WAL if using mvccPreAssign

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
Allan Yang 2017-05-06 23:28:14 +08:00 committed by Yu Li
parent 408645c4ef
commit 9a97e28bdf
13 changed files with 92 additions and 197 deletions

View File

@ -664,10 +664,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final Durability durability;
private final boolean regionStatsEnabled;
// flag and lock for MVCC preassign
private final boolean mvccPreAssign;
private final ReentrantLock preAssignMvccLock;
// whether to unassign region if we hit FNFE
private final RegionUnassigner regionUnassigner;
/**
@ -820,13 +816,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
// get mvcc pre-assign flag and lock
this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
if (this.mvccPreAssign) {
this.preAssignMvccLock = new ReentrantLock();
} else {
this.preAssignMvccLock = null;
}
boolean unassignForFNFE =
conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE);
if (unassignForFNFE) {
@ -2674,9 +2663,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
// Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
// so if an abort or stop, there is no way to call them in.
WALKey key = this.appendEmptyEdit(wal, null);
WALKey key = this.appendEmptyEdit(wal);
mvcc.complete(key.getWriteEntry());
return key.getSequenceId(this.maxWaitForSeqId);
return key.getSequenceId();
}
//////////////////////////////////////////////////////////////////////////////
@ -3418,29 +3407,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
}
} else {
try {
if (mvccPreAssign) {
preAssignMvccLock.lock();
writeEntry = mvcc.begin();
}
if (walEdit.size() > 0) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
if (mvccPreAssign) {
walKey.setPreAssignedWriteEntry(writeEntry);
}
txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
} else {
// If this is a skip wal operation just get the read point from mvcc
walKey = this.appendEmptyEdit(this.wal, writeEntry);
}
} finally {
if (mvccPreAssign) {
preAssignMvccLock.unlock();
}
if (walEdit.size() > 0) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
txid = this.wal
.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, true);
} else {
walKey = appendEmptyEdit(wal);
}
}
// ------------------------------------
@ -3478,7 +3454,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// before apply to memstore to avoid scan return incorrect value.
// we use durability of the original mutation for the mutation passed by CP.
boolean updateSeqId = isInReplay
|| batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign;
|| batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
updateSequenceId(familyMaps[i].values(), mvccNum);
}
@ -7402,7 +7378,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if(walKey == null){
// since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
walKey = this.appendEmptyEdit(this.wal, null);
walKey = this.appendEmptyEdit(this.wal);
}
// 7. Start mvcc transaction
@ -7701,7 +7677,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean updateSeqId = false;
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendEmptyEdit(this.wal, null);
walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
}
@ -7934,7 +7910,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
} else {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
walKey = this.appendEmptyEdit(this.wal, null);
walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
}
@ -8160,9 +8136,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
6 * Bytes.SIZEOF_BOOLEAN);
5 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
// 1 x HashMap - coprocessorServiceHandlers
@ -8744,19 +8720,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
* the WALEdit append later.
* @param wal
* @param writeEntry Preassigned writeEntry, if any
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException {
private WALKey appendEmptyEdit(final WAL wal) throws IOException {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
@SuppressWarnings("deprecation")
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
if (writeEntry != null) {
key.setPreAssignedWriteEntry(writeEntry);
}
// Call append but with an empty WALEdit. The returned sequence id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.

View File

@ -18,12 +18,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -110,21 +110,35 @@ public class MultiVersionConcurrencyControl {
return true;
}
/**
* Call {@link #begin(Runnable)} with an empty {@link Runnable}.
*/
public WriteEntry begin() {
return begin(new Runnable() {
@Override public void run() {
}
});
}
/**
* Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
* to our queue of ongoing writes. Return this WriteEntry instance.
* To complete the write transaction and wait for it to be visible, call
* {@link #completeAndWait(WriteEntry)}. If the write failed, call
* {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
* transaction.
* to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
* transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
* write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
* the failed write transaction.
* <p>
* The {@code action} will be executed under the lock which means it can keep the same order with
* mvcc.
* @see #complete(WriteEntry)
* @see #completeAndWait(WriteEntry)
*/
public WriteEntry begin() {
public WriteEntry begin(Runnable action) {
synchronized (writeQueue) {
long nextWriteNumber = writePoint.incrementAndGet();
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
action.run();
return e;
}
}

View File

@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import com.lmax.disruptor.*;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DrainBarrier;
@ -89,11 +92,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@ -1112,21 +1110,22 @@ public class FSHLog implements WAL {
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
// all this to make a key and then below to append the edit, we need to carry htd, info,
// etc. all over the ring buffer.
FSWALEntry entry = null;
long sequence = this.disruptor.getRingBuffer().next();
final MutableLong txidHolder = new MutableLong();
final RingBuffer<RingBufferTruck> ringBuffer = disruptor.getRingBuffer();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(new Runnable() {
@Override public void run() {
txidHolder.setValue(ringBuffer.next());
}
});
long txid = txidHolder.longValue();
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
FSWALEntry entry = new FSWALEntry(txid, key, edits, htd, hri, inMemstore);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
ringBuffer.publish(txid);
}
return sequence;
return txid;
}
/**
@ -1814,12 +1813,6 @@ public class FSHLog implements WAL {
try {
FSWALEntry entry = truck.unloadFSWALEntryPayload();
if (this.exception != null) {
// We got an exception on an earlier attempt at append. Do not let this append
// go through. Fail it but stamp the sequenceid into this append though failed.
// We need to do this to close the latch held down deep in WALKey...that is waiting
// on sequenceid assignment otherwise it will just hang out (The #append method
// called below does this also internally).
entry.stampRegionSequenceId();
// Return to keep processing events coming off the ringbuffer
return;
}
@ -1940,10 +1933,8 @@ public class FSHLog implements WAL {
byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
try {
// We are about to append this edit; update the region-scoped sequence number. Do it
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
regionSequenceId = entry.getKey().getSequenceId();
// Edits are empty, there is nothing to append. Maybe empty when we are looking for a
// region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.

View File

@ -106,34 +106,19 @@ class FSWALEntry extends Entry {
/**
* Here is where a WAL edit gets its sequenceid.
* @param we after HBASE-17471 we already get the mvcc number
* in WriteEntry, just stamp the writenumber to cells and walkey
* @return The sequenceid we stamped on this edit.
* @throws IOException
*/
long stampRegionSequenceId() throws IOException {
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
WALKey key = getKey();
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
boolean preAssigned = (we != null);
if (!preAssigned) {
MultiVersionConcurrencyControl mvcc = key.getMvcc();
if (mvcc != null) {
we = mvcc.begin();
}
}
if (we != null) {
regionSequenceId = we.getWriteNumber();
}
long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
long regionSequenceId = we.getWriteNumber();
if (!this.getEdit().isReplay() && inMemstore) {
for (Cell c:getEdit().getCells()) {
for (Cell c : getEdit().getCells()) {
CellUtil.setSequenceId(c, regionSequenceId);
}
}
// This has to stay in this order
if (!preAssigned) {
key.setWriteEntry(we);
}
getKey().setWriteEntry(we);
return regionSequenceId;
}

View File

@ -95,37 +95,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*/
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
if (this.preAssignedWriteEntry != null) {
// don't wait for seqNumAssignedLatch if writeEntry is preassigned
return this.preAssignedWriteEntry;
}
try {
this.seqNumAssignedLatch.await();
} catch (InterruptedException ie) {
// If interrupted... clear out our entry else we can block up mvcc.
MultiVersionConcurrencyControl mvcc = getMvcc();
LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
if (mvcc != null) {
if (this.writeEntry != null) {
mvcc.complete(this.writeEntry);
}
}
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
assert this.writeEntry != null;
return this.writeEntry;
}
@InterfaceAudience.Private // For internal use only.
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
assert this.writeEntry == null : "Non-null writeEntry when trying to set one";
assert this.writeEntry == null;
this.writeEntry = writeEntry;
// Set our sequenceid now using WriteEntry.
if (this.writeEntry != null) {
this.logSeqNum = this.writeEntry.getWriteNumber();
}
this.seqNumAssignedLatch.countDown();
this.logSeqNum = writeEntry.getWriteNumber();
}
// should be < 0 (@see HLogKey#readFields(DataInput))
@ -189,7 +168,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
@InterfaceAudience.Private
protected long logSeqNum;
private long origLogSeqNum = 0;
private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
// visible for deprecated HLogKey
@InterfaceAudience.Private
@ -206,7 +184,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
private long nonce = HConstants.NO_NONCE;
private MultiVersionConcurrencyControl mvcc;
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
// visible for deprecated HLogKey
@ -393,36 +370,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*/
@Override
public long getSequenceId() throws IOException {
return getSequenceId(-1);
}
/**
* Wait for sequence number to be assigned &amp; return the assigned value.
* @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid
* @return long the new assigned sequence number
* @throws IOException
*/
public long getSequenceId(final long maxWaitForSeqId) throws IOException {
// TODO: This implementation waiting on a latch is problematic because if a higher level
// determines we should stop or abort, there is no global list of all these blocked WALKeys
// waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
//
// UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
// even those that have failed (previously we were not... so they would just hang out...).
// St.Ack 20150910
try {
if (maxWaitForSeqId < 0) {
this.seqNumAssignedLatch.await();
} else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
"ms; WAL system stuck or has gone away?");
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for next log sequence number");
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
return this.logSeqNum;
}
@ -667,23 +614,4 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
/**
* @return The preassigned writeEntry, if any
*/
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
return this.preAssignedWriteEntry;
}
/**
* Preassign writeEntry
* @param writeEntry the entry to assign
*/
@InterfaceAudience.Private // For internal use only.
public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
if (writeEntry != null) {
this.preAssignedWriteEntry = writeEntry;
this.logSeqNum = writeEntry.getWriteNumber();
}
}
}

View File

@ -242,7 +242,9 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
long txid = log.append(htd, hri,
new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
new MultiVersionConcurrencyControl()),
edit, true);
log.sync(txid);
@ -326,7 +328,7 @@ public class TestWALObserver {
LOG.debug("write a log edit that supports legacy cps.");
final long now = EnvironmentEdgeManager.currentTime();
final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc);
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));

View File

@ -6272,11 +6272,8 @@ public class TestHRegion {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
WALKey key = invocation.getArgumentAt(2, WALKey.class);
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
if (we == null) {
we = key.getMvcc().begin();
key.setWriteEntry(we);
}
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
}

View File

@ -216,13 +216,15 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
try {
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
// manage append and sync carefully in below to manufacture hang. We keep adding same
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName(), System.currentTimeMillis(), mvcc);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@ -388,12 +390,12 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
byte[] bytes = Bytes.toBytes(getName());
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
try {
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName());
htd.getTableName(), System.currentTimeMillis(), mvcc);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@ -425,7 +427,7 @@ public class TestWALLockup {
// make RingBufferEventHandler sleep 1s, so the following sync
// endOfBatch=false
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("sleep"));
TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc);
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
Thread t = new Thread("Sync") {
@ -449,7 +451,7 @@ public class TestWALLockup {
}
// make append throw DamagedWALException
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("DamagedWALException"));
TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc);
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
while (latch.getCount() > 0) {

View File

@ -416,6 +416,7 @@ public class TestFSHLog {
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
wal.append(htd, info, logkey, edits, true);
region.getMVCC().completeAndWait(logkey.getWriteEntry());
}
region.flush(true);
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -101,7 +102,7 @@ public class TestWALActionsListener {
HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
SOME_BYTES, SOME_BYTES, false);
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 20; i++) {
byte[] b = Bytes.toBytes(i+"");
KeyValue kv = new KeyValue(b,b,b);
@ -111,7 +112,7 @@ public class TestWALActionsListener {
htd.addFamily(new HColumnDescriptor(b));
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.valueOf(b), 0), edit, true);
TableName.valueOf(b), 0, mvcc), edit, true);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);

View File

@ -1170,7 +1170,7 @@ public class TestWALReplay {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
rowName, family, ee, index), htd, hri, true);
entry.stampRegionSequenceId();
entry.stampRegionSequenceId(mvcc.begin());
return entry;
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
@ -95,11 +96,12 @@ public class TestSecureWAL {
final WAL wal =
wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis()), kvs, true);
System.currentTimeMillis(), mvcc), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);

View File

@ -370,12 +370,12 @@ public class TestWALFactory {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(tableName.getName()));
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis()), kvs, true);
System.currentTimeMillis(), mvcc), kvs, true);
}
// Now call sync to send the data to HDFS datanodes
wal.sync();