HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush of XXXX, because it is not online on us" (#753)
Signed-off-by: Lijin Bin <binlijin@apache.org> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
33e8156ebc
commit
3dba799837
|
@ -48,7 +48,7 @@ public final class ImmutableByteArray {
|
|||
return new ImmutableByteArray(b);
|
||||
}
|
||||
|
||||
public String toStringUtf8() {
|
||||
return Bytes.toString(b);
|
||||
public String toString() {
|
||||
return Bytes.toStringBinary(b);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,10 +134,10 @@ public class TestWALRecordReader {
|
|||
long ts = System.currentTimeMillis();
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
|
||||
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
|
||||
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
|
||||
edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
|
||||
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
|
||||
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
|
||||
log.sync();
|
||||
LOG.info("Before 1st WAL roll " + log.toString());
|
||||
log.rollWriter();
|
||||
|
@ -148,10 +148,10 @@ public class TestWALRecordReader {
|
|||
|
||||
edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
|
||||
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
|
||||
log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
|
||||
edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
|
||||
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
|
||||
log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
|
||||
log.sync();
|
||||
log.shutdown();
|
||||
walfactory.shutdown();
|
||||
|
@ -192,7 +192,7 @@ public class TestWALRecordReader {
|
|||
WALEdit edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
|
||||
System.currentTimeMillis(), value));
|
||||
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
|
||||
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
|
||||
log.sync(txid);
|
||||
|
||||
Thread.sleep(1); // make sure 2nd log gets a later timestamp
|
||||
|
@ -200,9 +200,8 @@ public class TestWALRecordReader {
|
|||
log.rollWriter();
|
||||
|
||||
edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
|
||||
System.currentTimeMillis(), value));
|
||||
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
|
||||
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
|
||||
log.sync(txid);
|
||||
log.shutdown();
|
||||
walfactory.shutdown();
|
||||
|
|
|
@ -7940,7 +7940,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
WriteEntry writeEntry = null;
|
||||
try {
|
||||
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
|
||||
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
|
||||
// Call sync on our edit.
|
||||
if (txid != 0) {
|
||||
sync(txid, durability);
|
||||
|
|
|
@ -935,7 +935,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
// Noop
|
||||
}
|
||||
|
||||
protected final boolean append(W writer, FSWALEntry entry) throws IOException {
|
||||
protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
|
||||
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
|
||||
atHeadOfRingBufferEventHandlerAppend();
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -959,8 +959,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
doAppend(writer, entry);
|
||||
assert highestUnsyncedTxid < entry.getTxid();
|
||||
highestUnsyncedTxid = entry.getTxid();
|
||||
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
|
||||
entry.isInMemStore());
|
||||
if (entry.isCloseRegion()) {
|
||||
// let's clean all the records of this region
|
||||
sequenceIdAccounting.onRegionClose(encodedRegionName);
|
||||
} else {
|
||||
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
|
||||
entry.isInMemStore());
|
||||
}
|
||||
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
|
||||
// Update metrics.
|
||||
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
|
||||
|
@ -1010,11 +1015,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
|
||||
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
|
||||
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
|
||||
throws IOException {
|
||||
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
|
||||
throws IOException {
|
||||
if (this.closed) {
|
||||
throw new IOException(
|
||||
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
|
||||
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
|
||||
}
|
||||
MutableLong txidHolder = new MutableLong();
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
|
||||
|
@ -1024,7 +1029,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
|
||||
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
|
||||
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
|
||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
|
||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
|
||||
entry.stampRegionSequenceId(we);
|
||||
ringBuffer.get(txid).load(entry);
|
||||
} finally {
|
||||
|
@ -1060,7 +1065,24 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
||||
return append(info, key, edits, true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
||||
throws IOException {
|
||||
return append(info, key, edits, false, closeRegion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a set of edits to the WAL.
|
||||
* <p/>
|
||||
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
||||
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
||||
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
|
||||
* <p/>
|
||||
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
|
||||
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
|
||||
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
|
||||
|
@ -1071,10 +1093,21 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
|
||||
* immediately available on return from this method. It WILL be available subsequent to a sync of
|
||||
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
|
||||
* @param info the regioninfo associated with append
|
||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||
* sequence id that is after all currently appended edits.
|
||||
* @param inMemstore Always true except for case where we are writing a region event marker, for
|
||||
* example, a compaction completion record into the WAL; in this case the entry is just
|
||||
* so we can finish an unfinished compaction -- it is not an edit for memstore.
|
||||
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
|
||||
* region on this region server. The WAL implementation should remove all the related
|
||||
* stuff, for example, the sequence id accounting.
|
||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||
* in it.
|
||||
*/
|
||||
@Override
|
||||
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||
throws IOException;
|
||||
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
||||
boolean closeRegion) throws IOException;
|
||||
|
||||
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
|
||||
|
||||
|
|
|
@ -434,7 +434,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
FSWALEntry entry = iter.next();
|
||||
boolean appended;
|
||||
try {
|
||||
appended = append(writer, entry);
|
||||
appended = appendEntry(writer, entry);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("should not happen", e);
|
||||
}
|
||||
|
@ -564,10 +564,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||
throws IOException {
|
||||
long txid =
|
||||
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
|
||||
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
||||
boolean closeRegion) throws IOException {
|
||||
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
|
||||
waitingConsumePayloads);
|
||||
if (shouldScheduleConsumer()) {
|
||||
consumeExecutor.execute(consumer);
|
||||
}
|
||||
|
|
|
@ -434,12 +434,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
|
||||
justification = "Will never be null")
|
||||
@Override
|
||||
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
|
||||
final boolean inMemstore) throws IOException {
|
||||
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
|
||||
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
|
||||
final boolean inMemstore, boolean closeRegion) throws IOException {
|
||||
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
|
||||
disruptor.getRingBuffer());
|
||||
}
|
||||
|
||||
|
@ -1096,7 +1094,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
*/
|
||||
void append(final FSWALEntry entry) throws Exception {
|
||||
try {
|
||||
FSHLog.this.append(writer, entry);
|
||||
FSHLog.this.appendEntry(writer, entry);
|
||||
} catch (Exception e) {
|
||||
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
|
||||
+ ", requesting roll of WAL";
|
||||
|
|
|
@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
|
|||
// they are only in memory and held here while passing over the ring buffer.
|
||||
private final transient long txid;
|
||||
private final transient boolean inMemstore;
|
||||
private final transient boolean closeRegion;
|
||||
private final transient RegionInfo regionInfo;
|
||||
private final transient Set<byte[]> familyNames;
|
||||
private final transient ServerCall<?> rpcCall;
|
||||
|
||||
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
|
||||
final boolean inMemstore, ServerCall<?> rpcCall) {
|
||||
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
|
||||
super(key, edit);
|
||||
this.inMemstore = inMemstore;
|
||||
this.closeRegion = closeRegion;
|
||||
this.regionInfo = regionInfo;
|
||||
this.txid = txid;
|
||||
if (inMemstore) {
|
||||
|
@ -98,6 +100,10 @@ class FSWALEntry extends Entry {
|
|||
return this.inMemstore;
|
||||
}
|
||||
|
||||
boolean isCloseRegion() {
|
||||
return closeRegion;
|
||||
}
|
||||
|
||||
RegionInfo getRegionInfo() {
|
||||
return this.regionInfo;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ImmutableByteArray;
|
||||
|
@ -183,6 +184,30 @@ class SequenceIdAccounting {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all the records of the given region as it is going to be closed.
|
||||
* <p/>
|
||||
* We will call this once we get the region close marker. We need this because that, if we use
|
||||
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
|
||||
* that has not been processed yet, this will lead to orphan records in the
|
||||
* lowestUnflushedSequenceIds and then cause too many WAL files.
|
||||
* <p/>
|
||||
* See HBASE-23157 for more details.
|
||||
*/
|
||||
void onRegionClose(byte[] encodedRegionName) {
|
||||
synchronized (tieLock) {
|
||||
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
|
||||
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
|
||||
if (flushing != null) {
|
||||
LOG.warn("Still have flushing records when closing {}, {}",
|
||||
Bytes.toString(encodedRegionName),
|
||||
flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())
|
||||
.collect(Collectors.joining(",", "{", "}")));
|
||||
}
|
||||
}
|
||||
this.highestSequenceIds.remove(encodedRegionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the store sequence id, e.g., upon executing in-memory compaction
|
||||
*/
|
||||
|
@ -363,7 +388,7 @@ class SequenceIdAccounting {
|
|||
Long currentId = tmpMap.get(e.getKey());
|
||||
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
|
||||
String errorStr = Bytes.toString(encodedRegionName) + " family "
|
||||
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
|
||||
+ e.getKey().toString() + " acquired edits out of order current memstore seq="
|
||||
+ currentId + ", previous oldest unflushed id=" + e.getValue();
|
||||
LOG.error(errorStr);
|
||||
Runtime.getRuntime().halt(1);
|
||||
|
|
|
@ -56,20 +56,19 @@ public class WALUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Write the marker that a compaction has succeeded and is about to be committed.
|
||||
* This provides info to the HMaster to allow it to recover the compaction if this regionserver
|
||||
* dies in the middle. It also prevents the compaction from finishing if this regionserver has
|
||||
* already lost its lease on the log.
|
||||
*
|
||||
* <p>This write is for internal use only. Not for external client consumption.
|
||||
* Write the marker that a compaction has succeeded and is about to be committed. This provides
|
||||
* info to the HMaster to allow it to recover the compaction if this regionserver dies in the
|
||||
* middle. It also prevents the compaction from finishing if this regionserver has already lost
|
||||
* its lease on the log.
|
||||
* <p/>
|
||||
* This write is for internal use only. Not for external client consumption.
|
||||
* @param mvcc Used by WAL to get sequence Id for the waledit.
|
||||
*/
|
||||
public static WALKeyImpl writeCompactionMarker(WAL wal,
|
||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
|
||||
MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
|
||||
MultiVersionConcurrencyControl mvcc) throws IOException {
|
||||
WALKeyImpl walKey =
|
||||
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
|
||||
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
||||
}
|
||||
|
@ -78,14 +77,14 @@ public class WALUtil {
|
|||
|
||||
/**
|
||||
* Write a flush marker indicating a start / abort or a complete of a region flush
|
||||
*
|
||||
* <p>This write is for internal use only. Not for external client consumption.
|
||||
* <p/>
|
||||
* This write is for internal use only. Not for external client consumption.
|
||||
*/
|
||||
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
|
||||
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
|
||||
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
|
||||
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
|
||||
WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
|
||||
}
|
||||
|
@ -93,15 +92,15 @@ public class WALUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Write a region open marker indicating that the region is opened.
|
||||
* This write is for internal use only. Not for external client consumption.
|
||||
* Write a region open marker indicating that the region is opened. This write is for internal use
|
||||
* only. Not for external client consumption.
|
||||
*/
|
||||
public static WALKeyImpl writeRegionEventMarker(WAL wal,
|
||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri,
|
||||
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
|
||||
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
|
||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final RegionEventDescriptor r,
|
||||
final MultiVersionConcurrencyControl mvcc) throws IOException {
|
||||
WALKeyImpl walKey =
|
||||
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r),
|
||||
r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
|
||||
}
|
||||
|
@ -119,11 +118,11 @@ public class WALUtil {
|
|||
* @throws IOException We will throw an IOException if we can not append to the HLog.
|
||||
*/
|
||||
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
|
||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
|
||||
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
WALKeyImpl walKey =
|
||||
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
|
||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
|
||||
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
|
||||
throws IOException {
|
||||
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
|
||||
WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
|
||||
}
|
||||
|
@ -131,36 +130,32 @@ public class WALUtil {
|
|||
}
|
||||
|
||||
private static WALKeyImpl writeMarker(final WAL wal,
|
||||
final NavigableMap<byte[], Integer> replicationScope,
|
||||
final RegionInfo hri,
|
||||
final WALEdit edit,
|
||||
final MultiVersionConcurrencyControl mvcc,
|
||||
final Map<String, byte[]> extendedAttributes)
|
||||
throws IOException {
|
||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
|
||||
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
|
||||
final Map<String, byte[]> extendedAttributes) throws IOException {
|
||||
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
|
||||
return doFullAppendTransaction(wal, replicationScope, hri,
|
||||
edit, mvcc, extendedAttributes, true);
|
||||
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc,
|
||||
extendedAttributes, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append,
|
||||
* an optional sync, and then a call to complete the mvcc transaction. This method does it all.
|
||||
* Good for case of adding a single edit or marker to the WAL.
|
||||
*
|
||||
* <p>This write is for internal use only. Not for external client consumption.
|
||||
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
|
||||
* optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
|
||||
* for case of adding a single edit or marker to the WAL.
|
||||
* <p/>
|
||||
* This write is for internal use only. Not for external client consumption.
|
||||
* @return WALKeyImpl that was added to the WAL.
|
||||
*/
|
||||
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
|
||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
|
||||
final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
|
||||
final Map<String, byte[]> extendedAttributes, final boolean sync)
|
||||
throws IOException {
|
||||
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
|
||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
|
||||
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
|
||||
final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
|
||||
// TODO: Pass in current time to use?
|
||||
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
|
||||
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
|
||||
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
|
||||
long trx = MultiVersionConcurrencyControl.NONE;
|
||||
try {
|
||||
trx = wal.append(hri, walKey, edit, false);
|
||||
trx = wal.appendMarker(hri, walKey, edit, closeRegion);
|
||||
if (sync) {
|
||||
wal.sync(trx);
|
||||
}
|
||||
|
|
|
@ -161,8 +161,18 @@ class DisabledWALProvider implements WALProvider {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||
throws IOException {
|
||||
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
||||
return append(info, key, edits, true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
||||
throws IOException {
|
||||
return append(info, key, edits, false, closeRegion);
|
||||
}
|
||||
|
||||
private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
||||
boolean closeRegion) throws IOException {
|
||||
WriteEntry writeEntry = key.getMvcc().begin();
|
||||
if (!edits.isReplay()) {
|
||||
for (Cell cell : edits.getCells()) {
|
||||
|
|
|
@ -57,7 +57,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
|||
/**
|
||||
* Roll the log writer. That is, start writing log messages to a new file.
|
||||
*
|
||||
* <p>
|
||||
* <p/>
|
||||
* The implementation is synchronized in order to make sure there's one rollWriter
|
||||
* running at any given time.
|
||||
*
|
||||
|
@ -70,7 +70,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
|||
/**
|
||||
* Roll the log writer. That is, start writing log messages to a new file.
|
||||
*
|
||||
* <p>
|
||||
* <p/>
|
||||
* The implementation is synchronized in order to make sure there's one rollWriter
|
||||
* running at any given time.
|
||||
*
|
||||
|
@ -98,44 +98,59 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
|||
void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
|
||||
* completes BUT on return this edit must have its region edit/sequence id assigned
|
||||
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
|
||||
* have the region edit/sequence id filled in.
|
||||
* Append a set of data edits to the WAL. 'Data' here means that the content in the edits will
|
||||
* also be added to memstore.
|
||||
* <p/>
|
||||
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
||||
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
||||
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
|
||||
* @param info the regioninfo associated with append
|
||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||
* sequence id that is after all currently appended edits.
|
||||
* @param inMemstore Always true except for case where we are writing a compaction completion
|
||||
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
|
||||
* -- it is not an edit for memstore.
|
||||
* sequence id that is after all currently appended edits.
|
||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||
* in it.
|
||||
* in it.
|
||||
* @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit, boolean)
|
||||
*/
|
||||
long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException;
|
||||
long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException;
|
||||
|
||||
/**
|
||||
* Append a marker edit to the WAL. A marker could be a FlushDescriptor, a compaction marker, or
|
||||
* region event marker. The difference here is that, a marker will not be added to memstore.
|
||||
* <p/>
|
||||
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
||||
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
||||
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
|
||||
* @param info the regioninfo associated with append
|
||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||
* sequence id that is after all currently appended edits.
|
||||
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
|
||||
* region on this region server. The WAL implementation should remove all the related
|
||||
* stuff, for example, the sequence id accounting.
|
||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||
* in it.
|
||||
* @see #appendData(RegionInfo, WALKeyImpl, WALEdit)
|
||||
*/
|
||||
long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* updates the seuence number of a specific store.
|
||||
* depending on the flag: replaces current seq number if the given seq id is bigger,
|
||||
* or even if it is lower than existing one
|
||||
* @param encodedRegionName
|
||||
* @param familyName
|
||||
* @param sequenceid
|
||||
* @param onlyIfGreater
|
||||
*/
|
||||
void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
|
||||
boolean onlyIfGreater);
|
||||
|
||||
/**
|
||||
* Sync what we have in the WAL.
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync() throws IOException;
|
||||
|
||||
/**
|
||||
* Sync the WAL if the txId was not already sync'd.
|
||||
* @param txid Transaction id to sync to.
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync(long txid) throws IOException;
|
||||
|
||||
|
|
|
@ -239,9 +239,8 @@ public class TestWALObserver {
|
|||
// it's where WAL write cp should occur.
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
|
||||
long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
|
||||
new MultiVersionConcurrencyControl(), scopes),
|
||||
edit, true);
|
||||
long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
|
||||
new MultiVersionConcurrencyControl(), scopes), edit);
|
||||
log.sync(txid);
|
||||
|
||||
// the edit shall have been change now by the coprocessor.
|
||||
|
@ -291,9 +290,9 @@ public class TestWALObserver {
|
|||
assertFalse(cp.isPostWALWriteCalled());
|
||||
|
||||
final long now = EnvironmentEdgeManager.currentTime();
|
||||
long txid = log.append(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
|
||||
new WALEdit(), true);
|
||||
long txid = log.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
|
||||
new WALEdit());
|
||||
log.sync(txid);
|
||||
|
||||
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
|
||||
|
@ -340,8 +339,8 @@ public class TestWALObserver {
|
|||
addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily,
|
||||
EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
|
||||
}
|
||||
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
|
||||
true);
|
||||
wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
|
||||
edit);
|
||||
// sync to fs.
|
||||
wal.sync();
|
||||
|
||||
|
@ -456,8 +455,8 @@ public class TestWALObserver {
|
|||
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
|
||||
// uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
|
||||
// about legacy coprocessors
|
||||
txid = wal.append(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true);
|
||||
txid = wal.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit);
|
||||
}
|
||||
if (-1 != txid) {
|
||||
wal.sync(txid);
|
||||
|
|
|
@ -604,9 +604,8 @@ public abstract class AbstractTestDLS {
|
|||
// HBaseTestingUtility.createMultiRegions use 5 bytes key
|
||||
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
|
||||
e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
|
||||
log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc),
|
||||
e, true);
|
||||
log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
|
||||
tableName, System.currentTimeMillis(), mvcc), e);
|
||||
if (0 == i % syncEvery) {
|
||||
log.sync();
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TestBulkLoad {
|
|||
storeFileName = (new Path(storeFileName)).getName();
|
||||
List<String> storeFileNames = new ArrayList<>();
|
||||
storeFileNames.add(storeFileName);
|
||||
when(log.append(any(), any(),
|
||||
when(log.appendMarker(any(), any(),
|
||||
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
|
||||
familyName, storeFileNames)),
|
||||
anyBoolean())).thenAnswer(new Answer() {
|
||||
|
@ -140,7 +140,7 @@ public class TestBulkLoad {
|
|||
|
||||
@Test
|
||||
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
|
||||
when(log.append(any(),
|
||||
when(log.appendMarker(any(),
|
||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
||||
anyBoolean())).thenAnswer(new Answer() {
|
||||
@Override
|
||||
|
@ -160,7 +160,7 @@ public class TestBulkLoad {
|
|||
|
||||
@Test
|
||||
public void shouldBulkLoadManyFamilyHLog() throws IOException {
|
||||
when(log.append(any(),
|
||||
when(log.appendMarker(any(),
|
||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
||||
anyBoolean())).thenAnswer(new Answer() {
|
||||
@Override
|
||||
|
@ -181,7 +181,7 @@ public class TestBulkLoad {
|
|||
|
||||
@Test
|
||||
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
|
||||
when(log.append(any(),
|
||||
when(log.appendMarker(any(),
|
||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
||||
anyBoolean())).thenAnswer(new Answer() {
|
||||
@Override
|
||||
|
|
|
@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.filter.BigDecimalComparator;
|
||||
|
@ -4501,10 +4502,9 @@ public class TestHRegion {
|
|||
put.setDurability(mutationDurability);
|
||||
region.put(put);
|
||||
|
||||
//verify append called or not
|
||||
verify(wal, expectAppend ? times(1) : never())
|
||||
.append((HRegionInfo)any(), (WALKeyImpl)any(),
|
||||
(WALEdit)any(), Mockito.anyBoolean());
|
||||
// verify append called or not
|
||||
verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
|
||||
(WALKeyImpl) any(), (WALEdit) any());
|
||||
|
||||
// verify sync called or not
|
||||
if (expectSync || expectSyncFromLogSyncer) {
|
||||
|
@ -5613,12 +5613,10 @@ public class TestHRegion {
|
|||
final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
|
||||
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
htd.addFamily(new HColumnDescriptor(fam1));
|
||||
htd.addFamily(new HColumnDescriptor(fam2));
|
||||
|
||||
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
|
||||
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
|
||||
// open the region w/o rss and wal and flush some files
|
||||
region =
|
||||
|
@ -5635,13 +5633,13 @@ public class TestHRegion {
|
|||
|
||||
// capture append() calls
|
||||
WAL wal = mockWAL();
|
||||
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
||||
when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
|
||||
|
||||
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
||||
TEST_UTIL.getConfiguration(), rss, null);
|
||||
|
||||
verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
|
||||
, editCaptor.capture(), anyBoolean());
|
||||
verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
editCaptor.capture(), anyBoolean());
|
||||
|
||||
WALEdit edit = editCaptor.getValue();
|
||||
assertNotNull(edit);
|
||||
|
@ -5707,15 +5705,14 @@ public class TestHRegion {
|
|||
|
||||
/**
|
||||
* Utility method to setup a WAL mock.
|
||||
* <p/>
|
||||
* Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
|
||||
* @return a mock WAL
|
||||
* @throws IOException
|
||||
*/
|
||||
private WAL mockWAL() throws IOException {
|
||||
WAL wal = mock(WAL.class);
|
||||
Mockito.when(wal.append((HRegionInfo)Mockito.any(),
|
||||
(WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
|
||||
thenAnswer(new Answer<Long>() {
|
||||
when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
|
||||
.thenAnswer(new Answer<Long>() {
|
||||
@Override
|
||||
public Long answer(InvocationOnMock invocation) throws Throwable {
|
||||
WALKeyImpl key = invocation.getArgument(1);
|
||||
|
@ -5723,32 +5720,38 @@ public class TestHRegion {
|
|||
key.setWriteEntry(we);
|
||||
return 1L;
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class),
|
||||
anyBoolean())).thenAnswer(new Answer<Long>() {
|
||||
@Override
|
||||
public Long answer(InvocationOnMock invocation) throws Throwable {
|
||||
WALKeyImpl key = invocation.getArgument(1);
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
|
||||
key.setWriteEntry(we);
|
||||
return 1L;
|
||||
}
|
||||
});
|
||||
return wal;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseRegionWrittenToWAL() throws Exception {
|
||||
|
||||
Path rootDir = new Path(dir + name.getMethodName());
|
||||
FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
|
||||
|
||||
final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
|
||||
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
htd.addFamily(new HColumnDescriptor(fam1));
|
||||
htd.addFamily(new HColumnDescriptor(fam2));
|
||||
|
||||
final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
|
||||
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
|
||||
ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
|
||||
|
||||
// capture append() calls
|
||||
WAL wal = mockWAL();
|
||||
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
||||
when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
|
||||
|
||||
|
||||
// create and then open a region first so that it can be closed later
|
||||
|
@ -5760,7 +5763,7 @@ public class TestHRegion {
|
|||
region.close(false);
|
||||
|
||||
// 2 times, one for region open, the other close region
|
||||
verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
|
||||
verify(wal, times(2)).appendMarker(any(RegionInfo.class), (WALKeyImpl) any(WALKeyImpl.class),
|
||||
editCaptor.capture(), anyBoolean());
|
||||
|
||||
WALEdit edit = editCaptor.getAllValues().get(1);
|
||||
|
|
|
@ -27,7 +27,6 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -1162,8 +1161,8 @@ public class TestHRegionReplayEvents {
|
|||
|
||||
// test for region open and close
|
||||
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
|
||||
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class), anyBoolean());
|
||||
verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class));
|
||||
|
||||
// test for replay prepare flush
|
||||
putDataByReplay(secondaryRegion, 0, 10, cq, families);
|
||||
|
@ -1178,12 +1177,12 @@ public class TestHRegionReplayEvents {
|
|||
primaryRegion.getRegionInfo().getRegionName()))
|
||||
.build());
|
||||
|
||||
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class), anyBoolean());
|
||||
verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class));
|
||||
|
||||
secondaryRegion.close();
|
||||
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class), anyBoolean());
|
||||
verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||
any(WALEdit.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -249,7 +249,7 @@ public class TestWALLockup {
|
|||
LOG.info("SET throwing of exception on append");
|
||||
dodgyWAL.throwException = true;
|
||||
// This append provokes a WAL roll request
|
||||
dodgyWAL.append(region.getRegionInfo(), key, edit, true);
|
||||
dodgyWAL.appendData(region.getRegionInfo(), key, edit);
|
||||
boolean exception = false;
|
||||
try {
|
||||
dodgyWAL.sync(false);
|
||||
|
|
|
@ -25,17 +25,22 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -46,8 +51,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -57,13 +64,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
|
||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
|
@ -169,7 +180,7 @@ public abstract class AbstractTestFSWAL {
|
|||
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
|
||||
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE, mvcc, scopes);
|
||||
log.append(hri, key, cols, true);
|
||||
log.appendData(hri, key, cols);
|
||||
}
|
||||
log.sync();
|
||||
}
|
||||
|
@ -418,7 +429,7 @@ public abstract class AbstractTestFSWAL {
|
|||
final RegionInfo info = region.getRegionInfo();
|
||||
final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
|
||||
wal.append(info, logkey, edits, true);
|
||||
wal.append(info, logkey, edits, true, false);
|
||||
region.getMVCC().completeAndWait(logkey.getWriteEntry());
|
||||
}
|
||||
region.flush(true);
|
||||
|
@ -468,7 +479,7 @@ public abstract class AbstractTestFSWAL {
|
|||
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
|
||||
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
|
||||
try {
|
||||
wal.append(ri, key, cols, true);
|
||||
wal.append(ri, key, cols, true, false);
|
||||
fail("Should fail since the wal has already been closed");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
|
@ -477,4 +488,94 @@ public abstract class AbstractTestFSWAL {
|
|||
assertNull(key.getWriteEntry());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
|
||||
final String testName = currentTest.getMethodName();
|
||||
final byte[] b = Bytes.toBytes("b");
|
||||
|
||||
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
|
||||
final CountDownLatch holdAppend = new CountDownLatch(1);
|
||||
final CountDownLatch closeFinished = new CountDownLatch(1);
|
||||
final CountDownLatch putFinished = new CountDownLatch(1);
|
||||
|
||||
try (AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), testName,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
|
||||
wal.init();
|
||||
wal.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
|
||||
if (startHoldingForAppend.get()) {
|
||||
try {
|
||||
holdAppend.await();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// open a new region which uses this WAL
|
||||
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
|
||||
RegionServerServices rsServices = mock(RegionServerServices.class);
|
||||
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
|
||||
when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
||||
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
|
||||
TEST_UTIL.getConfiguration(), rsServices, null);
|
||||
|
||||
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
|
||||
// do a regular write first because of memstore size calculation.
|
||||
region.put(new Put(b).addColumn(b, b, b));
|
||||
|
||||
startHoldingForAppend.set(true);
|
||||
exec.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
|
||||
putFinished.countDown();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// give the put a chance to start
|
||||
Threads.sleep(3000);
|
||||
|
||||
exec.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Map<?, ?> closeResult = region.close();
|
||||
LOG.info("Close result:" + closeResult);
|
||||
closeFinished.countDown();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// give the flush a chance to start. Flush should have got the region lock, and
|
||||
// should have been waiting on the mvcc complete after this.
|
||||
Threads.sleep(3000);
|
||||
|
||||
// let the append to WAL go through now that the flush already started
|
||||
holdAppend.countDown();
|
||||
putFinished.await();
|
||||
closeFinished.await();
|
||||
|
||||
// now check the region's unflushed seqIds.
|
||||
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
|
||||
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
|
||||
seqId);
|
||||
|
||||
wal.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -801,15 +801,15 @@ public abstract class AbstractTestWALReplay {
|
|||
long now = ee.currentTime();
|
||||
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
|
||||
now, rowName));
|
||||
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
|
||||
true);
|
||||
wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
|
||||
edit);
|
||||
|
||||
// Delete the c family to verify deletes make it over.
|
||||
edit = new WALEdit();
|
||||
now = ee.currentTime();
|
||||
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
|
||||
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
|
||||
true);
|
||||
wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
|
||||
edit);
|
||||
|
||||
// Sync.
|
||||
wal.sync();
|
||||
|
@ -1154,10 +1154,10 @@ public abstract class AbstractTestWALReplay {
|
|||
}
|
||||
|
||||
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
|
||||
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
|
||||
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
|
||||
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
|
||||
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
|
||||
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
|
||||
createWALEdit(rowName, family, ee, index), hri, true, null);
|
||||
createWALEdit(rowName, family, ee, index), hri, true, false, null);
|
||||
entry.stampRegionSequenceId(mvcc.begin());
|
||||
return entry;
|
||||
}
|
||||
|
@ -1167,8 +1167,8 @@ public abstract class AbstractTestWALReplay {
|
|||
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
|
||||
NavigableMap<byte[], Integer> scopes) throws IOException {
|
||||
for (int j = 0; j < count; j++) {
|
||||
wal.append(hri, createWALKey(tableName, hri, mvcc, scopes),
|
||||
createWALEdit(rowName, family, ee, j), true);
|
||||
wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
|
||||
createWALEdit(rowName, family, ee, j));
|
||||
}
|
||||
wal.sync();
|
||||
}
|
||||
|
|
|
@ -196,7 +196,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
|||
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE, mvcc, scopes);
|
||||
try {
|
||||
wal.append(ri, key, cols, true);
|
||||
wal.append(ri, key, cols, true, false);
|
||||
} catch (IOException e) {
|
||||
// should not happen
|
||||
throw new UncheckedIOException(e);
|
||||
|
|
|
@ -207,9 +207,8 @@ public class TestLogRollAbort {
|
|||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(Bytes.toBytes("column"), 0);
|
||||
log.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes),
|
||||
kvs, true);
|
||||
log.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
// Send the data to HDFS datanodes and close the HDFS writer
|
||||
log.sync();
|
||||
|
|
|
@ -166,8 +166,8 @@ public class TestLogRollingNoCluster {
|
|||
for(byte[] fam : htd.getColumnFamilyNames()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
|
||||
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
|
||||
final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
|
||||
TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
|
||||
Threads.sleep(ThreadLocalRandom.current().nextInt(5));
|
||||
wal.sync(txid);
|
||||
}
|
||||
|
|
|
@ -111,9 +111,8 @@ public class TestWALActionsListener {
|
|||
edit.add(kv);
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(b, 0);
|
||||
long txid = wal.append(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit,
|
||||
true);
|
||||
long txid = wal.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit);
|
||||
wal.sync(txid);
|
||||
if (i == 10) {
|
||||
wal.registerWALActionsListener(laterobserver);
|
||||
|
|
|
@ -424,7 +424,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
long now = EnvironmentEdgeManager.currentTime();
|
||||
edit.add(new KeyValue(rowName, famName, qualifier, now, value));
|
||||
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
|
||||
wal.append(hri, walKey, edit, true);
|
||||
wal.appendData(hri, walKey, edit);
|
||||
wal.sync();
|
||||
|
||||
Get get = new Get(rowName);
|
||||
|
|
|
@ -290,11 +290,9 @@ public abstract class TestReplicationSourceManager {
|
|||
wal.rollWriter();
|
||||
}
|
||||
LOG.info(Long.toString(i));
|
||||
final long txid = wal.append(
|
||||
hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
|
||||
edit,
|
||||
true);
|
||||
final long txid = wal.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
|
||||
edit);
|
||||
wal.sync(txid);
|
||||
}
|
||||
|
||||
|
@ -306,9 +304,9 @@ public abstract class TestReplicationSourceManager {
|
|||
LOG.info(baseline + " and " + time);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
wal.append(hri,
|
||||
wal.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
|
||||
edit, true);
|
||||
edit);
|
||||
}
|
||||
wal.sync();
|
||||
|
||||
|
@ -324,9 +322,9 @@ public abstract class TestReplicationSourceManager {
|
|||
manager.logPositionAndCleanOldLogs("1", false,
|
||||
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
|
||||
|
||||
wal.append(hri,
|
||||
wal.appendData(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
|
||||
edit, true);
|
||||
edit);
|
||||
wal.sync();
|
||||
|
||||
assertEquals(1, manager.getWALs().size());
|
||||
|
|
|
@ -558,9 +558,9 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
private void appendToLog(String key) throws IOException {
|
||||
final long txid = log.append(info,
|
||||
final long txid = log.appendData(info,
|
||||
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
|
||||
mvcc, scopes), getWALEdit(key), true);
|
||||
mvcc, scopes), getWALEdit(key));
|
||||
log.sync(txid);
|
||||
}
|
||||
|
||||
|
@ -582,8 +582,8 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
private long appendToLog(int count) throws IOException {
|
||||
return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
|
||||
return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
|
||||
}
|
||||
|
||||
private WALEdit getWALEdits(int count) {
|
||||
|
|
|
@ -63,12 +63,12 @@ public class FaultyFSLog extends FSHLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long append(RegionInfo info, WALKeyImpl key,
|
||||
WALEdit edits, boolean inMemstore) throws IOException {
|
||||
protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
||||
boolean closeRegion) throws IOException {
|
||||
if (this.ft == FailureType.APPEND) {
|
||||
throw new IOException("append");
|
||||
}
|
||||
return super.append(info, key, edits, inMemstore);
|
||||
return super.append(info, key, edits, inMemstore, closeRegion);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -156,8 +156,8 @@ public class TestFSHLogProvider {
|
|||
long timestamp = System.currentTimeMillis();
|
||||
WALEdit cols = new WALEdit();
|
||||
cols.add(new KeyValue(row, row, row, timestamp, row));
|
||||
log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes),
|
||||
cols, true);
|
||||
log.appendData(hri,
|
||||
getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols);
|
||||
}
|
||||
log.sync();
|
||||
}
|
||||
|
|
|
@ -129,8 +129,8 @@ public class TestSecureWAL {
|
|||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
|
||||
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
wal.sync();
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
|
|
|
@ -208,7 +208,7 @@ public class TestWALFactory {
|
|||
LOG.info("Region " + i + ": " + edit);
|
||||
WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes);
|
||||
log.append(infos[i], walKey, edit, true);
|
||||
log.appendData(infos[i], walKey, edit);
|
||||
walKey.getWriteEntry();
|
||||
}
|
||||
log.sync();
|
||||
|
@ -270,8 +270,8 @@ public class TestWALFactory {
|
|||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
|
||||
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
// Now call sync and try reading. Opening a Reader before you sync just
|
||||
// gives you EOFE.
|
||||
|
@ -289,8 +289,8 @@ public class TestWALFactory {
|
|||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
|
||||
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
wal.sync();
|
||||
reader = wals.createReader(fs, walPath);
|
||||
|
@ -311,8 +311,8 @@ public class TestWALFactory {
|
|||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
|
||||
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
// Now I should have written out lots of blocks. Sync then read.
|
||||
wal.sync();
|
||||
|
@ -388,9 +388,8 @@ public class TestWALFactory {
|
|||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
|
||||
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes),
|
||||
kvs, true);
|
||||
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
// Now call sync to send the data to HDFS datanodes
|
||||
wal.sync();
|
||||
|
@ -522,10 +521,8 @@ public class TestWALFactory {
|
|||
.setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
|
||||
final WAL log = wals.getWAL(info);
|
||||
|
||||
final long txid = log.append(info,
|
||||
new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
|
||||
mvcc, scopes),
|
||||
cols, true);
|
||||
final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
|
||||
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
|
||||
log.sync(txid);
|
||||
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
|
||||
log.completeCacheFlush(info.getEncodedNameAsBytes());
|
||||
|
@ -580,10 +577,8 @@ public class TestWALFactory {
|
|||
}
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
final WAL log = wals.getWAL(hri);
|
||||
final long txid = log.append(hri,
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
|
||||
mvcc, scopes),
|
||||
cols, true);
|
||||
final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
|
||||
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
|
||||
log.sync(txid);
|
||||
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
|
@ -634,8 +629,8 @@ public class TestWALFactory {
|
|||
cols.add(new KeyValue(row, Bytes.toBytes("column"),
|
||||
Bytes.toBytes(Integer.toString(i)),
|
||||
timestamp, new byte[]{(byte) (i + '0')}));
|
||||
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), cols, true);
|
||||
log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), cols);
|
||||
}
|
||||
log.sync();
|
||||
assertEquals(COL_COUNT, visitor.increments);
|
||||
|
@ -644,8 +639,8 @@ public class TestWALFactory {
|
|||
cols.add(new KeyValue(row, Bytes.toBytes("column"),
|
||||
Bytes.toBytes(Integer.toString(11)),
|
||||
timestamp, new byte[]{(byte) (11 + '0')}));
|
||||
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), cols, true);
|
||||
log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), cols);
|
||||
log.sync();
|
||||
assertEquals(COL_COUNT, visitor.increments);
|
||||
}
|
||||
|
|
|
@ -118,8 +118,8 @@ public class TestWALReaderOnSecureWAL {
|
|||
} else {
|
||||
kvs.add(kv);
|
||||
}
|
||||
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs, true);
|
||||
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||
}
|
||||
wal.sync();
|
||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
|
|
|
@ -98,8 +98,8 @@ public class TestWALRootDir {
|
|||
WALEdit edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
|
||||
System.currentTimeMillis(), value));
|
||||
long txid = log.append(regionInfo,
|
||||
getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true);
|
||||
long txid =
|
||||
log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit);
|
||||
log.sync(txid);
|
||||
assertEquals("Expect 1 log have been created", 1,
|
||||
getWALFiles(walFs, walRootDir).size());
|
||||
|
@ -109,8 +109,7 @@ public class TestWALRootDir {
|
|||
HConstants.HREGION_LOGDIR_NAME)).size());
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
|
||||
System.currentTimeMillis(), value));
|
||||
txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1),
|
||||
edit, true);
|
||||
txid = log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit);
|
||||
log.sync(txid);
|
||||
log.rollWriter();
|
||||
log.shutdown();
|
||||
|
|
|
@ -184,7 +184,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
RegionInfo hri = region.getRegionInfo();
|
||||
final WALKeyImpl logkey =
|
||||
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
|
||||
wal.append(hri, logkey, walEdit, true);
|
||||
wal.appendData(hri, logkey, walEdit);
|
||||
if (!this.noSync) {
|
||||
if (++lastSync >= this.syncInterval) {
|
||||
wal.sync();
|
||||
|
|
Loading…
Reference in New Issue