HBASE-17471 Region Seqid will be out of order in WAL if using mvccPreAssign (Allan Yang)
This commit is contained in:
parent
aff8de8397
commit
52ad310c80
|
@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
|
@ -644,9 +643,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// that has non-default scope
|
||||
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
// flag and lock for MVCC preassign
|
||||
private final boolean mvccPreAssign;
|
||||
private final ReentrantLock preAssignMvccLock;
|
||||
|
||||
/**
|
||||
* HRegion constructor. This constructor should only be used for testing and
|
||||
|
@ -806,14 +802,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
false :
|
||||
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
|
||||
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
|
||||
|
||||
// get mvcc pre-assign flag and lock
|
||||
this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
|
||||
if (this.mvccPreAssign) {
|
||||
this.preAssignMvccLock = new ReentrantLock();
|
||||
} else {
|
||||
this.preAssignMvccLock = null;
|
||||
}
|
||||
}
|
||||
|
||||
void setHTableSpecificConf() {
|
||||
|
@ -3349,26 +3337,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} else {
|
||||
try {
|
||||
if (!walEdit.isEmpty()) {
|
||||
try {
|
||||
if (this.mvccPreAssign) {
|
||||
preAssignMvccLock.lock();
|
||||
writeEntry = mvcc.begin();
|
||||
}
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
|
||||
this.getReplicationScope());
|
||||
if (this.mvccPreAssign) {
|
||||
walKey.setPreAssignedWriteEntry(writeEntry);
|
||||
}
|
||||
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
|
||||
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
|
||||
} finally {
|
||||
if (mvccPreAssign) {
|
||||
preAssignMvccLock.unlock();
|
||||
}
|
||||
}
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
|
||||
this.getReplicationScope());
|
||||
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
|
||||
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
|
||||
if (txid != 0) {
|
||||
sync(txid, durability);
|
||||
}
|
||||
|
@ -3400,7 +3375,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
|
||||
// 2) If no WAL, FSWALEntry won't be used
|
||||
// we use durability of the original mutation for the mutation passed by CP.
|
||||
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign;
|
||||
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
|
||||
if (updateSeqId) {
|
||||
this.updateSequenceId(familyMaps[i].values(),
|
||||
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
|
||||
|
|
|
@ -18,12 +18,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -109,21 +109,31 @@ public class MultiVersionConcurrencyControl {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call {@link #begin(Runnable)} with an empty {@link Runnable}.
|
||||
*/
|
||||
public WriteEntry begin() {
|
||||
return begin(() -> {});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
|
||||
* to our queue of ongoing writes. Return this WriteEntry instance.
|
||||
* To complete the write transaction and wait for it to be visible, call
|
||||
* {@link #completeAndWait(WriteEntry)}. If the write failed, call
|
||||
* {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
|
||||
* transaction.
|
||||
* to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
|
||||
* transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
|
||||
* write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
|
||||
* the failed write transaction.
|
||||
* <p>
|
||||
* The {@code action} will be executed under the lock which means it can keep the same order with
|
||||
* mvcc.
|
||||
* @see #complete(WriteEntry)
|
||||
* @see #completeAndWait(WriteEntry)
|
||||
*/
|
||||
public WriteEntry begin() {
|
||||
public WriteEntry begin(Runnable action) {
|
||||
synchronized (writeQueue) {
|
||||
long nextWriteNumber = writePoint.incrementAndGet();
|
||||
WriteEntry e = new WriteEntry(nextWriteNumber);
|
||||
writeQueue.add(e);
|
||||
action.run();
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableLong;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
||||
|
@ -73,6 +75,7 @@ import org.apache.htrace.Trace;
|
|||
import org.apache.htrace.TraceScope;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
/**
|
||||
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
|
||||
|
@ -881,11 +884,8 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
atHeadOfRingBufferEventHandlerAppend();
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
|
||||
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
|
||||
// We are about to append this edit; update the region-scoped sequence number. Do it
|
||||
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
|
||||
// so region sequenceids will also be in order.
|
||||
regionSequenceId = entry.stampRegionSequenceId();
|
||||
long regionSequenceId = entry.getKey().getSequenceId();
|
||||
|
||||
// Edits are empty, there is nothing to append. Maybe empty when we are looking for a
|
||||
// region sequence id only, a region edit/sequence id that is not associated with an actual
|
||||
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
|
||||
|
@ -944,6 +944,28 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
}
|
||||
}
|
||||
|
||||
protected long stampSequenceIdAndPublishToRingBuffer(HRegionInfo hri, WALKey key, WALEdit edits,
|
||||
boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
|
||||
throws IOException {
|
||||
if (this.closed) {
|
||||
throw new IOException("Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
|
||||
}
|
||||
TraceScope scope = Trace.startSpan(getClass().getSimpleName() + ".append");
|
||||
MutableLong txidHolder = new MutableLong();
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
|
||||
txidHolder.setValue(ringBuffer.next());
|
||||
});
|
||||
long txid = txidHolder.longValue();
|
||||
try {
|
||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
|
||||
entry.stampRegionSequenceId(we);
|
||||
ringBuffer.get(txid).load(entry, scope.detach());
|
||||
} finally {
|
||||
ringBuffer.publish(txid);
|
||||
}
|
||||
return txid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + " " + walFilePrefix + ":" + walFileSuffix + "(num "
|
||||
|
|
|
@ -544,17 +544,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
@Override
|
||||
public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore)
|
||||
throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Cannot append; log is closed");
|
||||
}
|
||||
TraceScope scope = Trace.startSpan("AsyncFSWAL.append");
|
||||
long txid = waitingConsumePayloads.next();
|
||||
try {
|
||||
RingBufferTruck truck = waitingConsumePayloads.get(txid);
|
||||
truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore), scope.detach());
|
||||
} finally {
|
||||
waitingConsumePayloads.publish(txid);
|
||||
}
|
||||
long txid =
|
||||
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
|
||||
if (shouldScheduleConsumer()) {
|
||||
eventLoop.execute(consumer);
|
||||
}
|
||||
|
|
|
@ -435,31 +435,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
|
||||
justification = "Will never be null")
|
||||
@Override
|
||||
public long append(final HRegionInfo hri,
|
||||
final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
|
||||
if (this.closed) {
|
||||
throw new IOException("Cannot append; log is closed");
|
||||
}
|
||||
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
|
||||
// single consuming thread. Don't have to worry about it.
|
||||
TraceScope scope = Trace.startSpan("FSHLog.append");
|
||||
|
||||
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
|
||||
// all this to make a key and then below to append the edit, we need to carry htd, info,
|
||||
// etc. all over the ring buffer.
|
||||
FSWALEntry entry = null;
|
||||
long sequence = this.disruptor.getRingBuffer().next();
|
||||
try {
|
||||
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
||||
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
|
||||
// edit with its edit/sequence id.
|
||||
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
|
||||
entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
|
||||
truck.load(entry, scope.detach());
|
||||
} finally {
|
||||
this.disruptor.getRingBuffer().publish(sequence);
|
||||
}
|
||||
return sequence;
|
||||
public long append(final HRegionInfo hri, final WALKey key, final WALEdit edits,
|
||||
final boolean inMemstore) throws IOException {
|
||||
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
|
||||
disruptor.getRingBuffer());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1009,12 +988,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
try {
|
||||
|
||||
if (this.exception != null) {
|
||||
// We got an exception on an earlier attempt at append. Do not let this append
|
||||
// go through. Fail it but stamp the sequenceid into this append though failed.
|
||||
// We need to do this to close the latch held down deep in WALKey...that is waiting
|
||||
// on sequenceid assignment otherwise it will just hang out (The #append method
|
||||
// called below does this also internally).
|
||||
entry.stampRegionSequenceId();
|
||||
// Return to keep processing events coming off the ringbuffer
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -52,9 +52,6 @@ class FSWALEntry extends Entry {
|
|||
private final transient boolean inMemstore;
|
||||
private final transient HRegionInfo hri;
|
||||
private final transient Set<byte[]> familyNames;
|
||||
// In the new WAL logic, we will rewrite failed WAL entries to new WAL file, so we need to avoid
|
||||
// calling stampRegionSequenceId again.
|
||||
private transient boolean stamped = false;
|
||||
|
||||
// The tracing span for this entry when writing WAL.
|
||||
private transient Span span;
|
||||
|
@ -105,38 +102,18 @@ class FSWALEntry extends Entry {
|
|||
}
|
||||
|
||||
/**
|
||||
* Here is where a WAL edit gets its sequenceid.
|
||||
* SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting the sequenceid into the
|
||||
* MVCC WriteEntry!!!!
|
||||
* Here is where a WAL edit gets its sequenceid. SIDE-EFFECT is our stamping the sequenceid into
|
||||
* every Cell AND setting the sequenceid into the MVCC WriteEntry!!!!
|
||||
* @return The sequenceid we stamped on this edit.
|
||||
*/
|
||||
long stampRegionSequenceId() throws IOException {
|
||||
if (stamped) {
|
||||
return getKey().getSequenceId();
|
||||
}
|
||||
stamped = true;
|
||||
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
|
||||
WALKey key = getKey();
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
|
||||
boolean preAssigned = (we != null);
|
||||
if (!preAssigned) {
|
||||
MultiVersionConcurrencyControl mvcc = key.getMvcc();
|
||||
if (mvcc != null) {
|
||||
we = mvcc.begin();
|
||||
}
|
||||
}
|
||||
if (we != null) {
|
||||
regionSequenceId = we.getWriteNumber();
|
||||
}
|
||||
|
||||
long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
|
||||
long regionSequenceId = we.getWriteNumber();
|
||||
if (!this.getEdit().isReplay() && inMemstore) {
|
||||
for (Cell c:getEdit().getCells()) {
|
||||
for (Cell c : getEdit().getCells()) {
|
||||
CellUtil.setSequenceId(c, regionSequenceId);
|
||||
}
|
||||
}
|
||||
if (!preAssigned) {
|
||||
key.setWriteEntry(we);
|
||||
}
|
||||
getKey().setWriteEntry(we);
|
||||
return regionSequenceId;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -29,30 +31,24 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* A Key for an entry in the WAL.
|
||||
|
@ -70,8 +66,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
|||
// purposes. They need to be merged into WALEntry.
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
private static final Log LOG = LogFactory.getLog(WALKey.class);
|
||||
private final CountDownLatch sequenceIdAssignedLatch = new CountDownLatch(1);
|
||||
|
||||
/**
|
||||
* Used to represent when a particular wal key doesn't know/care about the sequence ordering.
|
||||
*/
|
||||
|
@ -93,35 +88,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
|||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
|
||||
if (this.preAssignedWriteEntry != null) {
|
||||
// don't wait for seqNumAssignedLatch if writeEntry is preassigned
|
||||
return this.preAssignedWriteEntry;
|
||||
}
|
||||
try {
|
||||
this.sequenceIdAssignedLatch.await();
|
||||
} catch (InterruptedException ie) {
|
||||
MultiVersionConcurrencyControl mvcc = getMvcc();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
|
||||
}
|
||||
InterruptedIOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
throw iie;
|
||||
}
|
||||
assert this.writeEntry != null;
|
||||
return this.writeEntry;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
|
||||
if (this.writeEntry != null) {
|
||||
throw new RuntimeException("Non-null!!!");
|
||||
}
|
||||
assert this.writeEntry == null;
|
||||
this.writeEntry = writeEntry;
|
||||
// Set our sequenceid now using WriteEntry.
|
||||
if (this.writeEntry != null) {
|
||||
this.sequenceId = this.writeEntry.getWriteNumber();
|
||||
}
|
||||
this.sequenceIdAssignedLatch.countDown();
|
||||
this.sequenceId = writeEntry.getWriteNumber();
|
||||
}
|
||||
|
||||
// REMOVE!!!! No more Writables!!!!
|
||||
|
@ -208,7 +184,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
|||
* Set in a way visible to multiple threads; e.g. synchronized getter/setters.
|
||||
*/
|
||||
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
|
||||
private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
|
||||
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
|
||||
|
||||
// visible for deprecated HLogKey
|
||||
|
@ -722,24 +697,4 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
|||
this.origLogSeqNum = walKey.getOrigSequenceNumber();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The preassigned writeEntry, if any
|
||||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
|
||||
return this.preAssignedWriteEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Preassign writeEntry
|
||||
* @param writeEntry the entry to assign
|
||||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
|
||||
if (writeEntry != null) {
|
||||
this.preAssignedWriteEntry = writeEntry;
|
||||
this.sequenceId = writeEntry.getWriteNumber();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -226,8 +226,9 @@ public class TestWALObserver {
|
|||
// it's where WAL write cp should occur.
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
long txid = log.append(hri,
|
||||
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true);
|
||||
long txid = log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
|
||||
new MultiVersionConcurrencyControl(), scopes),
|
||||
edit, true);
|
||||
log.sync(txid);
|
||||
|
||||
// the edit shall have been change now by the coprocessor.
|
||||
|
|
|
@ -5895,11 +5895,8 @@ public class TestHRegion {
|
|||
@Override
|
||||
public Long answer(InvocationOnMock invocation) throws Throwable {
|
||||
WALKey key = invocation.getArgumentAt(1, WALKey.class);
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
|
||||
if (we == null) {
|
||||
we = key.getMvcc().begin();
|
||||
key.setWriteEntry(we);
|
||||
}
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
|
||||
key.setWriteEntry(we);
|
||||
return 1L;
|
||||
}
|
||||
|
||||
|
|
|
@ -221,6 +221,7 @@ public class TestWALLockup {
|
|||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(COLUMN_FAMILY_BYTES, 0);
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
try {
|
||||
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
|
||||
// manage append and sync carefully in below to manufacture hang. We keep adding same
|
||||
|
@ -228,7 +229,7 @@ public class TestWALLockup {
|
|||
Put put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
|
||||
System.currentTimeMillis(), scopes);
|
||||
System.currentTimeMillis(), mvcc, scopes);
|
||||
WALEdit edit = new WALEdit();
|
||||
CellScanner CellScanner = put.cellScanner();
|
||||
assertTrue(CellScanner.advance());
|
||||
|
@ -400,11 +401,12 @@ public class TestWALLockup {
|
|||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(COLUMN_FAMILY_BYTES, 0);
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
try {
|
||||
Put put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
|
||||
System.currentTimeMillis(), scopes);
|
||||
System.currentTimeMillis(), mvcc, scopes);
|
||||
WALEdit edit = new WALEdit();
|
||||
CellScanner CellScanner = put.cellScanner();
|
||||
assertTrue(CellScanner.advance());
|
||||
|
@ -436,7 +438,7 @@ public class TestWALLockup {
|
|||
// make RingBufferEventHandler sleep 1s, so the following sync
|
||||
// endOfBatch=false
|
||||
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), TableName.valueOf("sleep"),
|
||||
System.currentTimeMillis(), scopes);
|
||||
System.currentTimeMillis(), mvcc, scopes);
|
||||
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
|
||||
|
||||
Thread t = new Thread("Sync") {
|
||||
|
@ -460,7 +462,7 @@ public class TestWALLockup {
|
|||
}
|
||||
// make append throw DamagedWALException
|
||||
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
|
||||
TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), scopes);
|
||||
TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc, scopes);
|
||||
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
|
||||
|
||||
while (latch.getCount() > 0) {
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
/**
|
||||
* Test for HBASE-17471
|
||||
* MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in put/delete
|
||||
* path. Other write paths like increment/append still assign mvcc in ringbuffer's consumer
|
||||
* thread. If put and increment are used parallel. Then seqid in WAL may not increase monotonically
|
||||
* Disorder in wals will lead to data loss.
|
||||
* This case use two thread to put and increment at the same time in a single region.
|
||||
* Then check the seqid in WAL. If seqid is wal is not monotonically increasing, this case will fail
|
||||
*
|
||||
*/
|
||||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestWALMonotonicallyIncreasingSeqId {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId");
|
||||
private WALFactory wals;
|
||||
private FileSystem fileSystem;
|
||||
private Configuration walConf;
|
||||
|
||||
public static final String KEY_SEED = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
|
||||
private static final int KEY_SEED_LEN = KEY_SEED.length();
|
||||
|
||||
private static final char[] KEY_SEED_CHARS = KEY_SEED.toCharArray();
|
||||
|
||||
private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
for (byte[] family : families) {
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
// Set default to be three versions.
|
||||
hcd.setMaxVersions(Integer.MAX_VALUE);
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
return htd;
|
||||
}
|
||||
|
||||
private Region initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId)
|
||||
throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean("hbase.hregion.mvcc.preassign", false);
|
||||
Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
|
||||
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
|
||||
fileSystem = tableDir.getFileSystem(conf);
|
||||
HRegionFileSystem fs = new HRegionFileSystem(conf, fileSystem, tableDir, info);
|
||||
final Configuration walConf = new Configuration(conf);
|
||||
FSUtils.setRootDir(walConf, tableDir);
|
||||
this.walConf = walConf;
|
||||
wals = new WALFactory(walConf, null, "log_" + replicaId);
|
||||
HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(),
|
||||
info.getTable().getNamespace()), conf, htd, null);
|
||||
region.initialize();
|
||||
return region;
|
||||
}
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
public class PutThread extends Thread {
|
||||
HRegion region;
|
||||
public PutThread(HRegion region) {
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for(int i = 0; i < 100; i++) {
|
||||
byte[] row = Bytes.toBytes("putRow" + i);
|
||||
Put put = new Put(row);
|
||||
put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes(""));
|
||||
//put.setDurability(Durability.ASYNC_WAL);
|
||||
latch.await();
|
||||
region.batchMutate(new Mutation[]{put});
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Error happend when Increment: ", t);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public class IncThread extends Thread {
|
||||
HRegion region;
|
||||
public IncThread(HRegion region) {
|
||||
this.region = region;
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for(int i = 0; i < 100; i++) {
|
||||
byte[] row = Bytes.toBytes("incrementRow" + i);
|
||||
Increment inc = new Increment(row);
|
||||
inc.addColumn("cf".getBytes(), Bytes.toBytes(0), 1);
|
||||
//inc.setDurability(Durability.ASYNC_WAL);
|
||||
region.increment(inc);
|
||||
latch.countDown();
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Error happend when Put: ", t);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TestWALMonotonicallyIncreasingSeqId() throws Exception {
|
||||
byte[][] families = new byte[][] {Bytes.toBytes("cf")};
|
||||
byte[] qf = Bytes.toBytes("cq");
|
||||
HTableDescriptor htd = getTableDesc(TableName.valueOf("TestWALMonotonicallyIncreasingSeqId"), families);
|
||||
HRegion region = (HRegion)initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0);
|
||||
List<Thread> putThreads = new ArrayList<>();
|
||||
for(int i = 0; i < 1; i++) {
|
||||
putThreads.add(new PutThread(region));
|
||||
}
|
||||
IncThread incThread = new IncThread(region);
|
||||
for(int i = 0; i < 1; i++) {
|
||||
putThreads.get(i).start();
|
||||
}
|
||||
incThread.start();
|
||||
incThread.join();
|
||||
|
||||
Path logPath = ((FSHLog) region.getWAL()).getCurrentFileName();
|
||||
region.getWAL().rollWriter();
|
||||
Thread.sleep(10);
|
||||
Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR));
|
||||
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
WAL.Reader reader = null;
|
||||
try {
|
||||
reader = wals.createReader(fileSystem, logPath);
|
||||
} catch (Throwable t) {
|
||||
reader = wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
|
||||
|
||||
}
|
||||
WAL.Entry e;
|
||||
try {
|
||||
long currentMaxSeqid = 0;
|
||||
while ((e = reader.next()) != null) {
|
||||
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
|
||||
long currentSeqid = e.getKey().getSequenceId();
|
||||
if(currentSeqid > currentMaxSeqid) {
|
||||
currentMaxSeqid = currentSeqid;
|
||||
} else {
|
||||
Assert.fail("Current max Seqid is " + currentMaxSeqid
|
||||
+ ", but the next seqid in wal is smaller:" + currentSeqid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if(reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
if(region != null) {
|
||||
region.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -408,6 +408,7 @@ public abstract class AbstractTestFSWAL {
|
|||
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
|
||||
wal.append(info, logkey, edits, true);
|
||||
region.getMVCC().completeAndWait(logkey.getWriteEntry());
|
||||
}
|
||||
region.flush(true);
|
||||
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
|
||||
|
|
|
@ -1178,7 +1178,7 @@ public abstract class AbstractTestWALReplay {
|
|||
FSWALEntry entry =
|
||||
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
|
||||
rowName, family, ee, index), hri, true);
|
||||
entry.stampRegionSequenceId();
|
||||
entry.stampRegionSequenceId(mvcc.begin());
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,23 +18,30 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -42,14 +49,11 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test that the actions are called while playing with an WAL
|
||||
*/
|
||||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestWALActionsListener {
|
||||
private static final Log LOG = LogFactory.getLog(TestWALActionsListener.class);
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
@ -92,9 +96,9 @@ public class TestWALActionsListener {
|
|||
HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
|
||||
SOME_BYTES, SOME_BYTES, false);
|
||||
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
|
||||
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
byte[] b = Bytes.toBytes(i+"");
|
||||
byte[] b = Bytes.toBytes(i + "");
|
||||
KeyValue kv = new KeyValue(b,b,b);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(kv);
|
||||
|
@ -106,7 +110,7 @@ public class TestWALActionsListener {
|
|||
scopes.put(fam, 0);
|
||||
}
|
||||
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
|
||||
TableName.valueOf(b), 0, scopes), edit, true);
|
||||
TableName.valueOf(b), 0, mvcc, scopes), edit, true);
|
||||
wal.sync(txid);
|
||||
if (i == 10) {
|
||||
wal.registerWALActionsListener(laterobserver);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
||||
|
@ -134,11 +135,13 @@ public class TestSecureWAL {
|
|||
final WAL wal =
|
||||
wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
|
||||
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
|
||||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
|
||||
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), scopes), kvs, true);
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
}
|
||||
wal.sync();
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
|
|
|
@ -382,12 +382,13 @@ public class TestWALFactory {
|
|||
for(byte[] fam : htd.getFamiliesKeys()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
|
||||
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), scopes), kvs, true);
|
||||
System.currentTimeMillis(), mvcc, scopes),
|
||||
kvs, true);
|
||||
}
|
||||
// Now call sync to send the data to HDFS datanodes
|
||||
wal.sync();
|
||||
|
|
Loading…
Reference in New Issue