HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush of XXXX, because it is not online on us" (#753)

Signed-off-by: Lijin Bin <binlijin@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-10-26 20:37:37 +08:00 committed by GitHub
parent 50dc288875
commit 7b5cd0152f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 404 additions and 234 deletions

View File

@ -48,7 +48,7 @@ public final class ImmutableByteArray {
return new ImmutableByteArray(b); return new ImmutableByteArray(b);
} }
public String toStringUtf8() { public String toString() {
return Bytes.toString(b); return Bytes.toStringBinary(b);
} }
} }

View File

@ -135,10 +135,10 @@ public class TestWALRecordReader {
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(info, getWalKeyImpl(ts, scopes), edit, true); log.appendData(info, getWalKeyImpl(ts, scopes), edit);
edit = new WALEdit(); edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true); log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
log.sync(); log.sync();
LOG.info("Before 1st WAL roll " + log.toString()); LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter(); log.rollWriter();
@ -149,10 +149,10 @@ public class TestWALRecordReader {
edit = new WALEdit(); edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true); log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
edit = new WALEdit(); edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true); log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
log.sync(); log.sync();
log.shutdown(); log.shutdown();
walfactory.shutdown(); walfactory.shutdown();
@ -193,7 +193,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value)); System.currentTimeMillis(), value));
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid); log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp Thread.sleep(1); // make sure 2nd log gets a later timestamp
@ -201,9 +201,8 @@ public class TestWALRecordReader {
log.rollWriter(); log.rollWriter();
edit = new WALEdit(); edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
System.currentTimeMillis(), value)); txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid); log.sync(txid);
log.shutdown(); log.shutdown();
walfactory.shutdown(); walfactory.shutdown();
@ -253,17 +252,15 @@ public class TestWALRecordReader {
WAL log = walfactory.getWAL(info); WAL log = walfactory.getWAL(info);
byte [] value = Bytes.toBytes("value"); byte [] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value));
System.currentTimeMillis(), value)); long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid); log.sync(txid);
Thread.sleep(10); // make sure 2nd edit gets a later timestamp Thread.sleep(10); // make sure 2nd edit gets a later timestamp
edit = new WALEdit(); edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
System.currentTimeMillis(), value)); txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid); log.sync(txid);
log.shutdown(); log.shutdown();

View File

@ -7995,7 +7995,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
try { try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
// Call sync on our edit. // Call sync on our edit.
if (txid != 0) { if (txid != 0) {
sync(txid, durability); sync(txid, durability);

View File

@ -977,7 +977,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Noop // Noop
} }
protected final boolean append(W writer, FSWALEntry entry) throws IOException { protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
atHeadOfRingBufferEventHandlerAppend(); atHeadOfRingBufferEventHandlerAppend();
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
@ -1001,8 +1001,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
doAppend(writer, entry); doAppend(writer, entry);
assert highestUnsyncedTxid < entry.getTxid(); assert highestUnsyncedTxid < entry.getTxid();
highestUnsyncedTxid = entry.getTxid(); highestUnsyncedTxid = entry.getTxid();
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, if (entry.isCloseRegion()) {
entry.isInMemStore()); // let's clean all the records of this region
sequenceIdAccounting.onRegionClose(encodedRegionName);
} else {
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
entry.isInMemStore());
}
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics. // Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start); postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
@ -1052,11 +1057,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException { throws IOException {
if (this.closed) { if (this.closed) {
throw new IOException( throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
} }
MutableLong txidHolder = new MutableLong(); MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
@ -1066,7 +1071,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
entry.stampRegionSequenceId(we); entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry); ringBuffer.get(txid).load(entry);
} finally { } finally {
@ -1102,7 +1107,24 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true, false);
}
@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
throws IOException {
return append(info, key, edits, false, closeRegion);
}
/** /**
* Append a set of edits to the WAL.
* <p/>
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
* <p/>
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
@ -1113,10 +1135,21 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
* immediately available on return from this method. It WILL be available subsequent to a sync of * immediately available on return from this method. It WILL be available subsequent to a sync of
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in. * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
* @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a region event marker, for
* example, a compaction completion record into the WAL; in this case the entry is just
* so we can finish an unfinished compaction -- it is not an edit for memstore.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/ */
@Override protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) boolean closeRegion) throws IOException;
throws IOException;
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

View File

@ -434,7 +434,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
FSWALEntry entry = iter.next(); FSWALEntry entry = iter.next();
boolean appended; boolean appended;
try { try {
appended = append(writer, entry); appended = appendEntry(writer, entry);
} catch (IOException e) { } catch (IOException e) {
throw new AssertionError("should not happen", e); throw new AssertionError("should not happen", e);
} }
@ -615,13 +615,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
@Override @Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
throws IOException { boolean closeRegion) throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) { if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed"); throw new IOException("WAL is closing, only marker edit is allowed");
} }
long txid = long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); waitingConsumePayloads);
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer); consumeExecutor.execute(consumer);
} }

View File

@ -433,12 +433,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
justification = "Will never be null")
@Override @Override
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore) throws IOException { final boolean inMemstore, boolean closeRegion) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
disruptor.getRingBuffer()); disruptor.getRingBuffer());
} }
@ -1100,7 +1098,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
*/ */
void append(final FSWALEntry entry) throws Exception { void append(final FSWALEntry entry) throws Exception {
try { try {
FSHLog.this.append(writer, entry); FSHLog.this.appendEntry(writer, entry);
} catch (Exception e) { } catch (Exception e) {
String msg = "Append sequenceId=" + entry.getKey().getSequenceId() String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
+ ", requesting roll of WAL"; + ", requesting roll of WAL";

View File

@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer. // they are only in memory and held here while passing over the ring buffer.
private final transient long txid; private final transient long txid;
private final transient boolean inMemstore; private final transient boolean inMemstore;
private final transient boolean closeRegion;
private final transient RegionInfo regionInfo; private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames; private final transient Set<byte[]> familyNames;
private final transient ServerCall<?> rpcCall; private final transient ServerCall<?> rpcCall;
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo, FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) { final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
super(key, edit); super(key, edit);
this.inMemstore = inMemstore; this.inMemstore = inMemstore;
this.closeRegion = closeRegion;
this.regionInfo = regionInfo; this.regionInfo = regionInfo;
this.txid = txid; this.txid = txid;
if (inMemstore) { if (inMemstore) {
@ -98,6 +100,10 @@ class FSWALEntry extends Entry {
return this.inMemstore; return this.inMemstore;
} }
boolean isCloseRegion() {
return closeRegion;
}
RegionInfo getRegionInfo() { RegionInfo getRegionInfo() {
return this.regionInfo; return this.regionInfo;
} }

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray; import org.apache.hadoop.hbase.util.ImmutableByteArray;
@ -184,6 +185,30 @@ class SequenceIdAccounting {
} }
} }
/**
* Clear all the records of the given region as it is going to be closed.
* <p/>
* We will call this once we get the region close marker. We need this because that, if we use
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
* that has not been processed yet, this will lead to orphan records in the
* lowestUnflushedSequenceIds and then cause too many WAL files.
* <p/>
* See HBASE-23157 for more details.
*/
void onRegionClose(byte[] encodedRegionName) {
synchronized (tieLock) {
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
if (flushing != null) {
LOG.warn("Still have flushing records when closing {}, {}",
Bytes.toString(encodedRegionName),
flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())
.collect(Collectors.joining(",", "{", "}")));
}
}
this.highestSequenceIds.remove(encodedRegionName);
}
/** /**
* Update the store sequence id, e.g., upon executing in-memory compaction * Update the store sequence id, e.g., upon executing in-memory compaction
*/ */
@ -364,7 +389,7 @@ class SequenceIdAccounting {
Long currentId = tmpMap.get(e.getKey()); Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() < e.getValue().longValue()) { if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family " String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq=" + e.getKey().toString() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue(); + currentId + ", previous oldest unflushed id=" + e.getValue();
LOG.error(errorStr); LOG.error(errorStr);
Runtime.getRuntime().halt(1); Runtime.getRuntime().halt(1);

View File

@ -59,20 +59,19 @@ public class WALUtil {
} }
/** /**
* Write the marker that a compaction has succeeded and is about to be committed. * Write the marker that a compaction has succeeded and is about to be committed. This provides
* This provides info to the HMaster to allow it to recover the compaction if this regionserver * info to the HMaster to allow it to recover the compaction if this regionserver dies in the
* dies in the middle. It also prevents the compaction from finishing if this regionserver has * middle. It also prevents the compaction from finishing if this regionserver has already lost
* already lost its lease on the log. * its lease on the log.
* * <p/>
* <p>This write is for internal use only. Not for external client consumption. * This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit. * @param mvcc Used by WAL to get sequence Id for the waledit.
*/ */
public static WALKeyImpl writeCompactionMarker(WAL wal, public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c, NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc) MultiVersionConcurrencyControl mvcc) throws IOException {
throws IOException {
WALKeyImpl walKey = WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
} }
@ -81,14 +80,14 @@ public class WALUtil {
/** /**
* Write a flush marker indicating a start / abort or a complete of a region flush * Write a flush marker indicating a start / abort or a complete of a region flush
* * <p/>
* <p>This write is for internal use only. Not for external client consumption. * This write is for internal use only. Not for external client consumption.
*/ */
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope, public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException { throws IOException {
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri, WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
} }
@ -96,15 +95,15 @@ public class WALUtil {
} }
/** /**
* Write a region open marker indicating that the region is opened. * Write a region open marker indicating that the region is opened. This write is for internal use
* This write is for internal use only. Not for external client consumption. * only. Not for external client consumption.
*/ */
public static WALKeyImpl writeRegionEventMarker(WAL wal, public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final RegionEventDescriptor r,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) final MultiVersionConcurrencyControl mvcc) throws IOException {
throws IOException { WALKeyImpl walKey =
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r),
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
} }
@ -122,11 +121,11 @@ public class WALUtil {
* @throws IOException We will throw an IOException if we can not append to the HLog. * @throws IOException We will throw an IOException if we can not append to the HLog.
*/ */
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException { throws IOException {
WALKeyImpl walKey = WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
} }
@ -134,36 +133,32 @@ public class WALUtil {
} }
private static WALKeyImpl writeMarker(final WAL wal, private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final RegionInfo hri, boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
final WALEdit edit, final Map<String, byte[]> extendedAttributes) throws IOException {
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullAppendTransaction(wal, replicationScope, hri, return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc,
edit, mvcc, extendedAttributes, true); extendedAttributes, true);
} }
/** /**
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append, * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
* an optional sync, and then a call to complete the mvcc transaction. This method does it all. * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
* Good for case of adding a single edit or marker to the WAL. * for case of adding a single edit or marker to the WAL.
* * <p/>
* <p>This write is for internal use only. Not for external client consumption. * This write is for internal use only. Not for external client consumption.
* @return WALKeyImpl that was added to the WAL. * @return WALKeyImpl that was added to the WAL.
*/ */
public static WALKeyImpl doFullAppendTransaction(final WAL wal, private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync) final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
throws IOException {
// TODO: Pass in current time to use? // TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE; long trx = MultiVersionConcurrencyControl.NONE;
try { try {
trx = wal.append(hri, walKey, edit, false); trx = wal.appendMarker(hri, walKey, edit, closeRegion);
if (sync) { if (sync) {
wal.sync(trx); wal.sync(trx);
} }

View File

@ -161,8 +161,18 @@ class DisabledWALProvider implements WALProvider {
} }
@Override @Override
public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
throws IOException { return append(info, key, edits, true, false);
}
@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
throws IOException {
return append(info, key, edits, false, closeRegion);
}
private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException {
WriteEntry writeEntry = key.getMvcc().begin(); WriteEntry writeEntry = key.getMvcc().begin();
if (!edits.isReplay()) { if (!edits.isReplay()) {
for (Cell cell : edits.getCells()) { for (Cell cell : edits.getCells()) {

View File

@ -56,7 +56,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
/** /**
* Roll the log writer. That is, start writing log messages to a new file. * Roll the log writer. That is, start writing log messages to a new file.
* *
* <p> * <p/>
* The implementation is synchronized in order to make sure there's one rollWriter * The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time. * running at any given time.
* *
@ -69,7 +69,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
/** /**
* Roll the log writer. That is, start writing log messages to a new file. * Roll the log writer. That is, start writing log messages to a new file.
* *
* <p> * <p/>
* The implementation is synchronized in order to make sure there's one rollWriter * The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time. * running at any given time.
* *
@ -97,44 +97,59 @@ public interface WAL extends Closeable, WALFileLengthProvider {
void close() throws IOException; void close() throws IOException;
/** /**
* Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction * Append a set of data edits to the WAL. 'Data' here means that the content in the edits will
* completes BUT on return this edit must have its region edit/sequence id assigned * also be added to memstore.
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will * <p/>
* have the region edit/sequence id filled in. * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
* @param info the regioninfo associated with append * @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id. * @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits. * sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it. * in it.
* @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit, boolean)
*/ */
long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException;
/**
* Append a marker edit to the WAL. A marker could be a FlushDescriptor, a compaction marker, or
* region event marker. The difference here is that, a marker will not be added to memstore.
* <p/>
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
* @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
* @see #appendData(RegionInfo, WALKeyImpl, WALEdit)
*/
long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
throws IOException;
/** /**
* updates the seuence number of a specific store. * updates the seuence number of a specific store.
* depending on the flag: replaces current seq number if the given seq id is bigger, * depending on the flag: replaces current seq number if the given seq id is bigger,
* or even if it is lower than existing one * or even if it is lower than existing one
* @param encodedRegionName
* @param familyName
* @param sequenceid
* @param onlyIfGreater
*/ */
void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
boolean onlyIfGreater); boolean onlyIfGreater);
/** /**
* Sync what we have in the WAL. * Sync what we have in the WAL.
* @throws IOException
*/ */
void sync() throws IOException; void sync() throws IOException;
/** /**
* Sync the WAL if the txId was not already sync'd. * Sync the WAL if the txId was not already sync'd.
* @param txid Transaction id to sync to. * @param txid Transaction id to sync to.
* @throws IOException
*/ */
void sync(long txid) throws IOException; void sync(long txid) throws IOException;

View File

@ -239,9 +239,8 @@ public class TestWALObserver {
// it's where WAL write cp should occur. // it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
new MultiVersionConcurrencyControl(), scopes), new MultiVersionConcurrencyControl(), scopes), edit);
edit, true);
log.sync(txid); log.sync(txid);
// the edit shall have been change now by the coprocessor. // the edit shall have been change now by the coprocessor.
@ -291,9 +290,9 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled()); assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime(); final long now = EnvironmentEdgeManager.currentTime();
long txid = log.append(hri, long txid = log.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit(), true); new WALEdit());
log.sync(txid); log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@ -340,8 +339,8 @@ public class TestWALObserver {
addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
} }
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
true); edit);
// sync to fs. // sync to fs.
wal.sync(); wal.sync();
@ -456,8 +455,8 @@ public class TestWALObserver {
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors // about legacy coprocessors
txid = wal.append(hri, txid = wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit);
} }
if (-1 != txid) { if (-1 != txid) {
wal.sync(txid); wal.sync(txid);

View File

@ -604,9 +604,8 @@ public abstract class AbstractTestDLS {
// HBaseTestingUtility.createMultiRegions use 5 bytes key // HBaseTestingUtility.createMultiRegions use 5 bytes key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
System.currentTimeMillis(), mvcc), tableName, System.currentTimeMillis(), mvcc), e);
e, true);
if (0 == i % syncEvery) { if (0 == i % syncEvery) {
log.sync(); log.sync();
} }

View File

@ -115,7 +115,7 @@ public class TestBulkLoad {
storeFileName = (new Path(storeFileName)).getName(); storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<>(); List<String> storeFileNames = new ArrayList<>();
storeFileNames.add(storeFileName); storeFileNames.add(storeFileName);
when(log.append(any(), any(), when(log.appendMarker(any(), any(),
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
familyName, storeFileNames)), familyName, storeFileNames)),
anyBoolean())).thenAnswer(new Answer() { anyBoolean())).thenAnswer(new Answer() {
@ -142,7 +142,7 @@ public class TestBulkLoad {
@Test @Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException { public void shouldBulkLoadSingleFamilyHLog() throws IOException {
when(log.append(any(), when(log.appendMarker(any(),
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() { anyBoolean())).thenAnswer(new Answer() {
@Override @Override
@ -162,7 +162,7 @@ public class TestBulkLoad {
@Test @Test
public void shouldBulkLoadManyFamilyHLog() throws IOException { public void shouldBulkLoadManyFamilyHLog() throws IOException {
when(log.append(any(), when(log.appendMarker(any(),
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() { anyBoolean())).thenAnswer(new Answer() {
@Override @Override
@ -183,7 +183,7 @@ public class TestBulkLoad {
@Test @Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
when(log.append(any(), when(log.appendMarker(any(),
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() { anyBoolean())).thenAnswer(new Answer() {
@Override @Override

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.filter.BigDecimalComparator; import org.apache.hadoop.hbase.filter.BigDecimalComparator;
@ -4514,10 +4515,9 @@ public class TestHRegion {
put.setDurability(mutationDurability); put.setDurability(mutationDurability);
region.put(put); region.put(put);
//verify append called or not // verify append called or not
verify(wal, expectAppend ? times(1) : never()) verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
.append((HRegionInfo)any(), (WALKeyImpl)any(), (WALKeyImpl) any(), (WALEdit) any());
(WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not // verify sync called or not
if (expectSync || expectSyncFromLogSyncer) { if (expectSync || expectSyncFromLogSyncer) {
@ -5626,12 +5626,10 @@ public class TestHRegion {
final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
htd.addFamily(new HColumnDescriptor(fam1)); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
htd.addFamily(new HColumnDescriptor(fam2)); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
// open the region w/o rss and wal and flush some files // open the region w/o rss and wal and flush some files
region = region =
@ -5648,13 +5646,13 @@ public class TestHRegion {
// capture append() calls // capture append() calls
WAL wal = mockWAL(); WAL wal = mockWAL();
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal); when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null); TEST_UTIL.getConfiguration(), rss, null);
verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any() verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
, editCaptor.capture(), anyBoolean()); editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue(); WALEdit edit = editCaptor.getValue();
assertNotNull(edit); assertNotNull(edit);
@ -5720,15 +5718,14 @@ public class TestHRegion {
/** /**
* Utility method to setup a WAL mock. * Utility method to setup a WAL mock.
* <p/>
* Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
* @return a mock WAL * @return a mock WAL
* @throws IOException
*/ */
private WAL mockWAL() throws IOException { private WAL mockWAL() throws IOException {
WAL wal = mock(WAL.class); WAL wal = mock(WAL.class);
Mockito.when(wal.append((HRegionInfo)Mockito.any(), when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
(WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). .thenAnswer(new Answer<Long>() {
thenAnswer(new Answer<Long>() {
@Override @Override
public Long answer(InvocationOnMock invocation) throws Throwable { public Long answer(InvocationOnMock invocation) throws Throwable {
WALKeyImpl key = invocation.getArgument(1); WALKeyImpl key = invocation.getArgument(1);
@ -5736,32 +5733,38 @@ public class TestHRegion {
key.setWriteEntry(we); key.setWriteEntry(we);
return 1L; return 1L;
} }
});
}); when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class),
anyBoolean())).thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
WALKeyImpl key = invocation.getArgument(1);
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
}
});
return wal; return wal;
} }
@Test @Test
public void testCloseRegionWrittenToWAL() throws Exception { public void testCloseRegionWrittenToWAL() throws Exception {
Path rootDir = new Path(dir + name.getMethodName()); Path rootDir = new Path(dir + name.getMethodName());
FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42); final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
htd.addFamily(new HColumnDescriptor(fam1)); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
htd.addFamily(new HColumnDescriptor(fam2)); .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
// capture append() calls // capture append() calls
WAL wal = mockWAL(); WAL wal = mockWAL();
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal); when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
// create and then open a region first so that it can be closed later // create and then open a region first so that it can be closed later
@ -5773,7 +5776,7 @@ public class TestHRegion {
region.close(false); region.close(false);
// 2 times, one for region open, the other close region // 2 times, one for region open, the other close region
verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(), verify(wal, times(2)).appendMarker(any(RegionInfo.class), (WALKeyImpl) any(WALKeyImpl.class),
editCaptor.capture(), anyBoolean()); editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1); WALEdit edit = editCaptor.getAllValues().get(1);

View File

@ -27,7 +27,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -1164,8 +1163,8 @@ public class TestHRegionReplayEvents {
// test for region open and close // test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
any(WALEdit.class), anyBoolean()); any(WALEdit.class));
// test for replay prepare flush // test for replay prepare flush
putDataByReplay(secondaryRegion, 0, 10, cq, families); putDataByReplay(secondaryRegion, 0, 10, cq, families);
@ -1180,12 +1179,12 @@ public class TestHRegionReplayEvents {
primaryRegion.getRegionInfo().getRegionName())) primaryRegion.getRegionInfo().getRegionName()))
.build()); .build());
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
any(WALEdit.class), anyBoolean()); any(WALEdit.class));
secondaryRegion.close(); secondaryRegion.close();
verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
any(WALEdit.class), anyBoolean()); any(WALEdit.class));
} }
/** /**

View File

@ -249,7 +249,7 @@ public class TestWALLockup {
LOG.info("SET throwing of exception on append"); LOG.info("SET throwing of exception on append");
dodgyWAL.throwException = true; dodgyWAL.throwException = true;
// This append provokes a WAL roll request // This append provokes a WAL roll request
dodgyWAL.append(region.getRegionInfo(), key, edit, true); dodgyWAL.appendData(region.getRegionInfo(), key, edit);
boolean exception = false; boolean exception = false;
try { try {
dodgyWAL.sync(false); dodgyWAL.sync(false);

View File

@ -25,17 +25,22 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -46,8 +51,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
@ -57,13 +64,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -168,7 +179,7 @@ public abstract class AbstractTestFSWAL {
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, scopes); HConstants.NO_NONCE, mvcc, scopes);
log.append(hri, key, cols, true); log.appendData(hri, key, cols);
} }
log.sync(); log.sync();
} }
@ -417,7 +428,7 @@ public abstract class AbstractTestFSWAL {
final RegionInfo info = region.getRegionInfo(); final RegionInfo info = region.getRegionInfo();
final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
wal.append(info, logkey, edits, true); wal.append(info, logkey, edits, true, false);
region.getMVCC().completeAndWait(logkey.getWriteEntry()); region.getMVCC().completeAndWait(logkey.getWriteEntry());
} }
region.flush(true); region.flush(true);
@ -466,7 +477,7 @@ public abstract class AbstractTestFSWAL {
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
try { try {
wal.append(ri, key, cols, true); wal.append(ri, key, cols, true, false);
fail("Should fail since the wal has already been closed"); fail("Should fail since the wal has already been closed");
} catch (IOException e) { } catch (IOException e) {
// expected // expected
@ -484,4 +495,94 @@ public abstract class AbstractTestFSWAL {
wal.close(); wal.close();
wal.rollWriter(); wal.rollWriter();
} }
@Test
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
final String testName = currentTest.getMethodName();
final byte[] b = Bytes.toBytes("b");
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
final CountDownLatch holdAppend = new CountDownLatch(1);
final CountDownLatch closeFinished = new CountDownLatch(1);
final CountDownLatch putFinished = new CountDownLatch(1);
try (AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
wal.init();
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
if (startHoldingForAppend.get()) {
try {
holdAppend.await();
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
}
});
// open a new region which uses this WAL
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
TEST_UTIL.getConfiguration(), rsServices, null);
ExecutorService exec = Executors.newFixedThreadPool(2);
// do a regular write first because of memstore size calculation.
region.put(new Put(b).addColumn(b, b, b));
startHoldingForAppend.set(true);
exec.submit(new Runnable() {
@Override
public void run() {
try {
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
putFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}
});
// give the put a chance to start
Threads.sleep(3000);
exec.submit(new Runnable() {
@Override
public void run() {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
closeFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}
});
// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
Threads.sleep(3000);
// let the append to WAL go through now that the flush already started
holdAppend.countDown();
putFinished.await();
closeFinished.await();
// now check the region's unflushed seqIds.
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
seqId);
wal.close();
}
}
} }

View File

@ -801,15 +801,15 @@ public abstract class AbstractTestWALReplay {
long now = ee.currentTime(); long now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName)); now, rowName));
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
true); edit);
// Delete the c family to verify deletes make it over. // Delete the c family to verify deletes make it over.
edit = new WALEdit(); edit = new WALEdit();
now = ee.currentTime(); now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
true); edit);
// Sync. // Sync.
wal.sync(); wal.sync();
@ -1154,10 +1154,10 @@ public abstract class AbstractTestWALReplay {
} }
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index, NavigableMap<byte[], Integer> scopes) throws IOException { int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
createWALEdit(rowName, family, ee, index), hri, true, null); createWALEdit(rowName, family, ee, index), hri, true, false, null);
entry.stampRegionSequenceId(mvcc.begin()); entry.stampRegionSequenceId(mvcc.begin());
return entry; return entry;
} }
@ -1167,8 +1167,8 @@ public abstract class AbstractTestWALReplay {
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> scopes) throws IOException { NavigableMap<byte[], Integer> scopes) throws IOException {
for (int j = 0; j < count; j++) { for (int j = 0; j < count; j++) {
wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
createWALEdit(rowName, family, ee, j), true); createWALEdit(rowName, family, ee, j));
} }
wal.sync(); wal.sync();
} }

View File

@ -83,7 +83,7 @@ public final class ProtobufLogTestHelper {
throws IOException { throws IOException {
for (int i = 0; i < recordCount; i++) { for (int i = 0; i < recordCount; i++) {
WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc); WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc);
wal.append(hri, entry.getKey(), entry.getEdit(), true); wal.appendData(hri, entry.getKey(), entry.getEdit());
} }
wal.sync(); wal.sync();
} }

View File

@ -196,7 +196,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, scopes); HConstants.NO_NONCE, mvcc, scopes);
try { try {
wal.append(ri, key, cols, true); wal.append(ri, key, cols, true, false);
} catch (IOException e) { } catch (IOException e) {
// should not happen // should not happen
throw new UncheckedIOException(e); throw new UncheckedIOException(e);

View File

@ -207,9 +207,8 @@ public class TestLogRollAbort {
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(Bytes.toBytes("column"), 0); scopes.put(Bytes.toBytes("column"), 0);
log.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, log.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), System.currentTimeMillis(), mvcc, scopes), kvs);
kvs, true);
} }
// Send the data to HDFS datanodes and close the HDFS writer // Send the data to HDFS datanodes and close the HDFS writer
log.sync(); log.sync();

View File

@ -166,8 +166,8 @@ public class TestLogRollingNoCluster {
for(byte[] fam : htd.getColumnFamilyNames()) { for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0); scopes.put(fam, 0);
} }
final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
Threads.sleep(ThreadLocalRandom.current().nextInt(5)); Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid); wal.sync(txid);
} }

View File

@ -111,9 +111,8 @@ public class TestWALActionsListener {
edit.add(kv); edit.add(kv);
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(b, 0); scopes.put(b, 0);
long txid = wal.append(hri, long txid = wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit, new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit);
true);
wal.sync(txid); wal.sync(txid);
if (i == 10) { if (i == 10) {
wal.registerWALActionsListener(laterobserver); wal.registerWALActionsListener(laterobserver);

View File

@ -423,7 +423,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
edit.add(new KeyValue(rowName, famName, qualifier, now, value)); edit.add(new KeyValue(rowName, famName, qualifier, now, value));
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
wal.append(hri, walKey, edit, true); wal.appendData(hri, walKey, edit);
wal.sync(); wal.sync();
Get get = new Get(rowName); Get get = new Get(rowName);

View File

@ -300,11 +300,9 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter(); wal.rollWriter();
} }
LOG.info(Long.toString(i)); LOG.info(Long.toString(i));
final long txid = wal.append( final long txid = wal.appendData(hri,
hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit);
edit,
true);
wal.sync(txid); wal.sync(txid);
} }
@ -316,9 +314,9 @@ public abstract class TestReplicationSourceManager {
LOG.info(baseline + " and " + time); LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
wal.append(hri, wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit, true); edit);
} }
wal.sync(); wal.sync();
@ -338,9 +336,9 @@ public abstract class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(source, manager.logPositionAndCleanOldLogs(source,
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.append(hri, wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit, true); edit);
wal.sync(); wal.sync();
assertEquals(1, manager.getWALs().size()); assertEquals(1, manager.getWALs().size());

View File

@ -556,9 +556,9 @@ public class TestWALEntryStream {
} }
private void appendToLog(String key) throws IOException { private void appendToLog(String key) throws IOException {
final long txid = log.append(info, final long txid = log.appendData(info,
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
mvcc, scopes), getWALEdit(key), true); mvcc, scopes), getWALEdit(key));
log.sync(txid); log.sync(txid);
} }
@ -580,8 +580,8 @@ public class TestWALEntryStream {
} }
private long appendToLog(int count) throws IOException { private long appendToLog(int count) throws IOException {
return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
} }
private WALEdit getWALEdits(int count) { private WALEdit getWALEdits(int count) {

View File

@ -63,12 +63,12 @@ public class FaultyFSLog extends FSHLog {
} }
@Override @Override
public long append(RegionInfo info, WALKeyImpl key, protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
WALEdit edits, boolean inMemstore) throws IOException { boolean closeRegion) throws IOException {
if (this.ft == FailureType.APPEND) { if (this.ft == FailureType.APPEND) {
throw new IOException("append"); throw new IOException("append");
} }
return super.append(info, key, edits, inMemstore); return super.append(info, key, edits, inMemstore, closeRegion);
} }
} }

View File

@ -156,8 +156,8 @@ public class TestFSHLogProvider {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit(); WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row)); cols.add(new KeyValue(row, row, row, timestamp, row));
log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), log.appendData(hri,
cols, true); getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols);
} }
log.sync(); log.sync();
} }

View File

@ -129,8 +129,8 @@ public class TestSecureWAL {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit(); WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true); System.currentTimeMillis(), mvcc, scopes), kvs);
} }
wal.sync(); wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);

View File

@ -208,7 +208,7 @@ public class TestWALFactory {
LOG.info("Region " + i + ": " + edit); LOG.info("Region " + i + ": " + edit);
WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes); System.currentTimeMillis(), mvcc, scopes);
log.append(infos[i], walKey, edit, true); log.appendData(infos[i], walKey, edit);
walKey.getWriteEntry(); walKey.getWriteEntry();
} }
log.sync(); log.sync();
@ -270,8 +270,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit(); WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true); System.currentTimeMillis(), mvcc, scopes), kvs);
} }
// Now call sync and try reading. Opening a Reader before you sync just // Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE. // gives you EOFE.
@ -289,8 +289,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit(); WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true); System.currentTimeMillis(), mvcc, scopes), kvs);
} }
wal.sync(); wal.sync();
reader = wals.createReader(fs, walPath); reader = wals.createReader(fs, walPath);
@ -311,8 +311,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit(); WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true); System.currentTimeMillis(), mvcc, scopes), kvs);
} }
// Now I should have written out lots of blocks. Sync then read. // Now I should have written out lots of blocks. Sync then read.
wal.sync(); wal.sync();
@ -388,9 +388,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit(); WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), System.currentTimeMillis(), mvcc, scopes), kvs);
kvs, true);
} }
// Now call sync to send the data to HDFS datanodes // Now call sync to send the data to HDFS datanodes
wal.sync(); wal.sync();
@ -522,10 +521,8 @@ public class TestWALFactory {
.setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
final WAL log = wals.getWAL(info); final WAL log = wals.getWAL(info);
final long txid = log.append(info, final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
mvcc, scopes),
cols, true);
log.sync(txid); log.sync(txid);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes());
@ -580,10 +577,8 @@ public class TestWALFactory {
} }
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
final WAL log = wals.getWAL(hri); final WAL log = wals.getWAL(hri);
final long txid = log.append(hri, final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
mvcc, scopes),
cols, true);
log.sync(txid); log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes());
@ -634,8 +629,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"), cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')})); timestamp, new byte[]{(byte) (i + '0')}));
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true); System.currentTimeMillis(), mvcc, scopes), cols);
} }
log.sync(); log.sync();
assertEquals(COL_COUNT, visitor.increments); assertEquals(COL_COUNT, visitor.increments);
@ -644,8 +639,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"), cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)), Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')})); timestamp, new byte[]{(byte) (11 + '0')}));
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true); System.currentTimeMillis(), mvcc, scopes), cols);
log.sync(); log.sync();
assertEquals(COL_COUNT, visitor.increments); assertEquals(COL_COUNT, visitor.increments);
} }

View File

@ -118,8 +118,8 @@ public class TestWALReaderOnSecureWAL {
} else { } else {
kvs.add(kv); kvs.add(kv);
} }
wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true); System.currentTimeMillis(), mvcc, scopes), kvs);
} }
wal.sync(); wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);

View File

@ -98,8 +98,8 @@ public class TestWALRootDir {
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value)); System.currentTimeMillis(), value));
long txid = log.append(regionInfo, long txid =
getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true); log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit);
log.sync(txid); log.sync(txid);
assertEquals("Expect 1 log have been created", 1, assertEquals("Expect 1 log have been created", 1,
getWALFiles(walFs, walRootDir).size()); getWALFiles(walFs, walRootDir).size());
@ -109,8 +109,7 @@ public class TestWALRootDir {
HConstants.HREGION_LOGDIR_NAME)).size()); HConstants.HREGION_LOGDIR_NAME)).size());
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value)); System.currentTimeMillis(), value));
txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), txid = log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit);
edit, true);
log.sync(txid); log.sync(txid);
log.rollWriter(); log.rollWriter();
log.shutdown(); log.shutdown();

View File

@ -184,7 +184,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
RegionInfo hri = region.getRegionInfo(); RegionInfo hri = region.getRegionInfo();
final WALKeyImpl logkey = final WALKeyImpl logkey =
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
wal.append(hri, logkey, walEdit, true); wal.appendData(hri, logkey, walEdit);
if (!this.noSync) { if (!this.noSync) {
if (++lastSync >= this.syncInterval) { if (++lastSync >= this.syncInterval) {
wal.sync(); wal.sync();