From 7b5cd0152f09ce67fee12ca9a4737e1f7d527155 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 26 Oct 2019 20:37:37 +0800 Subject: [PATCH] HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush of XXXX, because it is not online on us" (#753) Signed-off-by: Lijin Bin Signed-off-by: stack --- .../hadoop/hbase/util/ImmutableByteArray.java | 4 +- .../hbase/mapreduce/TestWALRecordReader.java | 25 ++-- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 53 +++++++-- .../hbase/regionserver/wal/AsyncFSWAL.java | 10 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 10 +- .../hbase/regionserver/wal/FSWALEntry.java | 8 +- .../wal/SequenceIdAccounting.java | 27 ++++- .../hbase/regionserver/wal/WALUtil.java | 91 +++++++-------- .../hadoop/hbase/wal/DisabledWALProvider.java | 14 ++- .../java/org/apache/hadoop/hbase/wal/WAL.java | 51 +++++--- .../hbase/coprocessor/TestWALObserver.java | 19 ++- .../hadoop/hbase/master/AbstractTestDLS.java | 5 +- .../hbase/regionserver/TestBulkLoad.java | 8 +- .../hbase/regionserver/TestHRegion.java | 59 +++++----- .../regionserver/TestHRegionReplayEvents.java | 13 +-- .../hbase/regionserver/TestWALLockup.java | 2 +- .../regionserver/wal/AbstractTestFSWAL.java | 109 +++++++++++++++++- .../wal/AbstractTestWALReplay.java | 18 +-- .../wal/ProtobufLogTestHelper.java | 2 +- .../regionserver/wal/TestAsyncFSWAL.java | 2 +- .../regionserver/wal/TestLogRollAbort.java | 5 +- .../wal/TestLogRollingNoCluster.java | 4 +- .../wal/TestWALActionsListener.java | 5 +- .../TestReplicationSmallTests.java | 2 +- .../TestReplicationSourceManager.java | 16 ++- .../regionserver/TestWALEntryStream.java | 8 +- .../apache/hadoop/hbase/wal/FaultyFSLog.java | 6 +- .../hadoop/hbase/wal/TestFSHLogProvider.java | 4 +- .../hadoop/hbase/wal/TestSecureWAL.java | 4 +- .../hadoop/hbase/wal/TestWALFactory.java | 39 +++---- .../hbase/wal/TestWALReaderOnSecureWAL.java | 4 +- .../hadoop/hbase/wal/TestWALRootDir.java | 7 +- .../hbase/wal/WALPerformanceEvaluation.java | 2 +- 34 files changed, 404 insertions(+), 234 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java index 3e8fee8db1f..1232b9ce35e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java @@ -48,7 +48,7 @@ public final class ImmutableByteArray { return new ImmutableByteArray(b); } - public String toStringUtf8() { - return Bytes.toString(b); + public String toString() { + return Bytes.toStringBinary(b); } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 449c4b7985a..4614c259331 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -135,10 +135,10 @@ public class TestWALRecordReader { long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, getWalKeyImpl(ts, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts, scopes), edit); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, getWalKeyImpl(ts+1, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts+1, scopes), edit); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -149,10 +149,10 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -193,7 +193,7 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -201,9 +201,8 @@ public class TestWALRecordReader { log.rollWriter(); edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); + txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); log.shutdown(); walfactory.shutdown(); @@ -253,17 +252,15 @@ public class TestWALRecordReader { WAL log = walfactory.getWAL(info); byte [] value = Bytes.toBytes("value"); WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); + long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); Thread.sleep(10); // make sure 2nd edit gets a later timestamp edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); + txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); log.shutdown(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 59b8502d5b2..5d6cbf0ab29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7995,7 +7995,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } WriteEntry writeEntry = null; try { - long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit); // Call sync on our edit. if (txid != 0) { sync(txid, durability); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index add883a3415..2eb7c7436ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -977,7 +977,7 @@ public abstract class AbstractFSWAL implements WAL { // Noop } - protected final boolean append(W writer, FSWALEntry entry) throws IOException { + protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. atHeadOfRingBufferEventHandlerAppend(); long start = EnvironmentEdgeManager.currentTime(); @@ -1001,8 +1001,13 @@ public abstract class AbstractFSWAL implements WAL { doAppend(writer, entry); assert highestUnsyncedTxid < entry.getTxid(); highestUnsyncedTxid = entry.getTxid(); - sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, - entry.isInMemStore()); + if (entry.isCloseRegion()) { + // let's clean all the records of this region + sequenceIdAccounting.onRegionClose(encodedRegionName); + } else { + sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, + entry.isInMemStore()); + } coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); @@ -1052,11 +1057,11 @@ public abstract class AbstractFSWAL implements WAL { } protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, - WALEdit edits, boolean inMemstore, RingBuffer ringBuffer) - throws IOException { + WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer ringBuffer) + throws IOException { if (this.closed) { throw new IOException( - "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); + "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); } MutableLong txidHolder = new MutableLong(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { @@ -1066,7 +1071,7 @@ public abstract class AbstractFSWAL implements WAL { ServerCall rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { @@ -1102,7 +1107,24 @@ public abstract class AbstractFSWAL implements WAL { } } + @Override + public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { + return append(info, key, edits, true, false); + } + + @Override + public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException { + return append(info, key, edits, false, closeRegion); + } + /** + * Append a set of edits to the WAL. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. + *

* NOTE: This append, at a time that is usually after this call returns, starts an mvcc * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must @@ -1113,10 +1135,21 @@ public abstract class AbstractFSWAL implements WAL { * passed in WALKey walKey parameter. Be warned that the WriteEntry is not * immediately available on return from this method. It WILL be available subsequent to a sync of * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. + * @param info the regioninfo associated with append + * @param key Modified by this call; we add to it this edits region edit/sequence id. + * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit + * sequence id that is after all currently appended edits. + * @param inMemstore Always true except for case where we are writing a region event marker, for + * example, a compaction completion record into the WAL; in this case the entry is just + * so we can finish an unfinished compaction -- it is not an edit for memstore. + * @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this + * region on this region server. The WAL implementation should remove all the related + * stuff, for example, the sequence id accounting. + * @return Returns a 'transaction id' and key will have the region edit/sequence id + * in it. */ - @Override - public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException; + protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index e93642dfda9..14edaaee964 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -434,7 +434,7 @@ public class AsyncFSWAL extends AbstractFSWAL { FSWALEntry entry = iter.next(); boolean appended; try { - appended = append(writer, entry); + appended = appendEntry(writer, entry); } catch (IOException e) { throw new AssertionError("should not happen", e); } @@ -615,13 +615,13 @@ public class AsyncFSWAL extends AbstractFSWAL { } @Override - public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException { + protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { if (markerEditOnly() && !edits.isMetaEdit()) { throw new IOException("WAL is closing, only marker edit is allowed"); } - long txid = - stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); + long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion, + waitingConsumePayloads); if (shouldScheduleConsumer()) { consumeExecutor.execute(consumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b61a0768670..d5187233feb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -433,12 +433,10 @@ public class FSHLog extends AbstractFSWAL { } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", - justification = "Will never be null") @Override - public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, - final boolean inMemstore) throws IOException { - return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, + protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, + final boolean inMemstore, boolean closeRegion) throws IOException { + return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion, disruptor.getRingBuffer()); } @@ -1100,7 +1098,7 @@ public class FSHLog extends AbstractFSWAL { */ void append(final FSWALEntry entry) throws Exception { try { - FSHLog.this.append(writer, entry); + FSHLog.this.appendEntry(writer, entry); } catch (Exception e) { String msg = "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 5bdec137d54..acb49b7cca3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -51,14 +51,16 @@ class FSWALEntry extends Entry { // they are only in memory and held here while passing over the ring buffer. private final transient long txid; private final transient boolean inMemstore; + private final transient boolean closeRegion; private final transient RegionInfo regionInfo; private final transient Set familyNames; private final transient ServerCall rpcCall; FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo, - final boolean inMemstore, ServerCall rpcCall) { + final boolean inMemstore, boolean closeRegion, ServerCall rpcCall) { super(key, edit); this.inMemstore = inMemstore; + this.closeRegion = closeRegion; this.regionInfo = regionInfo; this.txid = txid; if (inMemstore) { @@ -98,6 +100,10 @@ class FSWALEntry extends Entry { return this.inMemstore; } + boolean isCloseRegion() { + return closeRegion; + } + RegionInfo getRegionInfo() { return this.regionInfo; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 9736d8bd1f8..0832f82f93e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ImmutableByteArray; @@ -184,6 +185,30 @@ class SequenceIdAccounting { } } + /** + * Clear all the records of the given region as it is going to be closed. + *

+ * We will call this once we get the region close marker. We need this because that, if we use + * Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries + * that has not been processed yet, this will lead to orphan records in the + * lowestUnflushedSequenceIds and then cause too many WAL files. + *

+ * See HBASE-23157 for more details. + */ + void onRegionClose(byte[] encodedRegionName) { + synchronized (tieLock) { + this.lowestUnflushedSequenceIds.remove(encodedRegionName); + Map flushing = this.flushingSequenceIds.remove(encodedRegionName); + if (flushing != null) { + LOG.warn("Still have flushing records when closing {}, {}", + Bytes.toString(encodedRegionName), + flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue()) + .collect(Collectors.joining(",", "{", "}"))); + } + } + this.highestSequenceIds.remove(encodedRegionName); + } + /** * Update the store sequence id, e.g., upon executing in-memory compaction */ @@ -364,7 +389,7 @@ class SequenceIdAccounting { Long currentId = tmpMap.get(e.getKey()); if (currentId != null && currentId.longValue() < e.getValue().longValue()) { String errorStr = Bytes.toString(encodedRegionName) + " family " - + e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq=" + + e.getKey().toString() + " acquired edits out of order current memstore seq=" + currentId + ", previous oldest unflushed id=" + e.getValue(); LOG.error(errorStr); Runtime.getRuntime().halt(1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 1808cd67b1a..027e412d38c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -59,20 +59,19 @@ public class WALUtil { } /** - * Write the marker that a compaction has succeeded and is about to be committed. - * This provides info to the HMaster to allow it to recover the compaction if this regionserver - * dies in the middle. It also prevents the compaction from finishing if this regionserver has - * already lost its lease on the log. - * - *

This write is for internal use only. Not for external client consumption. + * Write the marker that a compaction has succeeded and is about to be committed. This provides + * info to the HMaster to allow it to recover the compaction if this regionserver dies in the + * middle. It also prevents the compaction from finishing if this regionserver has already lost + * its lease on the log. + *

+ * This write is for internal use only. Not for external client consumption. * @param mvcc Used by WAL to get sequence Id for the waledit. */ public static WALKeyImpl writeCompactionMarker(WAL wal, - NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, - MultiVersionConcurrencyControl mvcc) - throws IOException { + NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, + MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); + writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -81,14 +80,14 @@ public class WALUtil { /** * Write a flush marker indicating a start / abort or a complete of a region flush - * - *

This write is for internal use only. Not for external client consumption. + *

+ * This write is for internal use only. Not for external client consumption. */ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap replicationScope, - RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri, - WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); + RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, + WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } @@ -96,15 +95,15 @@ public class WALUtil { } /** - * Write a region open marker indicating that the region is opened. - * This write is for internal use only. Not for external client consumption. + * Write a region open marker indicating that the region is opened. This write is for internal use + * only. Not for external client consumption. */ public static WALKeyImpl writeRegionEventMarker(WAL wal, - NavigableMap replicationScope, RegionInfo hri, - final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, - WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); + NavigableMap replicationScope, RegionInfo hri, final RegionEventDescriptor r, + final MultiVersionConcurrencyControl mvcc) throws IOException { + WALKeyImpl walKey = + writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), + r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -122,11 +121,11 @@ public class WALUtil { * @throws IOException We will throw an IOException if we can not append to the HLog. */ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, - final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); + final NavigableMap replicationScope, final RegionInfo hri, + final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, + WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } @@ -134,36 +133,32 @@ public class WALUtil { } private static WALKeyImpl writeMarker(final WAL wal, - final NavigableMap replicationScope, - final RegionInfo hri, - final WALEdit edit, - final MultiVersionConcurrencyControl mvcc, - final Map extendedAttributes) - throws IOException { + final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, + boolean closeRegion, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes) throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullAppendTransaction(wal, replicationScope, hri, - edit, mvcc, extendedAttributes, true); + return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc, + extendedAttributes, true); } /** - * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, - * an optional sync, and then a call to complete the mvcc transaction. This method does it all. - * Good for case of adding a single edit or marker to the WAL. - * - *

This write is for internal use only. Not for external client consumption. + * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an + * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good + * for case of adding a single edit or marker to the WAL. + *

+ * This write is for internal use only. Not for external client consumption. * @return WALKeyImpl that was added to the WAL. */ - public static WALKeyImpl doFullAppendTransaction(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, - final WALEdit edit, final MultiVersionConcurrencyControl mvcc, - final Map extendedAttributes, final boolean sync) - throws IOException { + private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, + final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, + boolean closeRegion, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes, final boolean sync) throws IOException { // TODO: Pass in current time to use? WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); + System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { - trx = wal.append(hri, walKey, edit, false); + trx = wal.appendMarker(hri, walKey, edit, closeRegion); if (sync) { wal.sync(trx); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5f787fef599..7e894aedc89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -161,8 +161,18 @@ class DisabledWALProvider implements WALProvider { } @Override - public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException { + public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { + return append(info, key, edits, true, false); + } + + @Override + public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException { + return append(info, key, edits, false, closeRegion); + } + + private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { WriteEntry writeEntry = key.getMvcc().begin(); if (!edits.isReplay()) { for (Cell cell : edits.getCells()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cde159..74dab64646d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -56,7 +56,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { /** * Roll the log writer. That is, start writing log messages to a new file. * - *

+ *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * @@ -69,7 +69,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { /** * Roll the log writer. That is, start writing log messages to a new file. * - *

+ *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * @@ -97,44 +97,59 @@ public interface WAL extends Closeable, WALFileLengthProvider { void close() throws IOException; /** - * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction - * completes BUT on return this edit must have its region edit/sequence id assigned - * else it messes up our unification of mvcc and sequenceid. On return key will - * have the region edit/sequence id filled in. + * Append a set of data edits to the WAL. 'Data' here means that the content in the edits will + * also be added to memstore. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. * @param info the regioninfo associated with append * @param key Modified by this call; we add to it this edits region edit/sequence id. * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit - * sequence id that is after all currently appended edits. - * @param inMemstore Always true except for case where we are writing a compaction completion - * record into the WAL; in this case the entry is just so we can finish an unfinished compaction - * -- it is not an edit for memstore. + * sequence id that is after all currently appended edits. * @return Returns a 'transaction id' and key will have the region edit/sequence id - * in it. + * in it. + * @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit, boolean) */ - long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; + long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException; + + /** + * Append a marker edit to the WAL. A marker could be a FlushDescriptor, a compaction marker, or + * region event marker. The difference here is that, a marker will not be added to memstore. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. + * @param info the regioninfo associated with append + * @param key Modified by this call; we add to it this edits region edit/sequence id. + * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit + * sequence id that is after all currently appended edits. + * @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this + * region on this region server. The WAL implementation should remove all the related + * stuff, for example, the sequence id accounting. + * @return Returns a 'transaction id' and key will have the region edit/sequence id + * in it. + * @see #appendData(RegionInfo, WALKeyImpl, WALEdit) + */ + long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException; /** * updates the seuence number of a specific store. * depending on the flag: replaces current seq number if the given seq id is bigger, * or even if it is lower than existing one - * @param encodedRegionName - * @param familyName - * @param sequenceid - * @param onlyIfGreater */ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, boolean onlyIfGreater); /** * Sync what we have in the WAL. - * @throws IOException */ void sync() throws IOException; /** * Sync the WAL if the txId was not already sync'd. * @param txid Transaction id to sync to. - * @throws IOException */ void sync(long txid) throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dde020d4326..9381ef36eb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -239,9 +239,8 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. - long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, - new MultiVersionConcurrencyControl(), scopes), - edit, true); + long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, + new MultiVersionConcurrencyControl(), scopes), edit); log.sync(txid); // the edit shall have been change now by the coprocessor. @@ -291,9 +290,9 @@ public class TestWALObserver { assertFalse(cp.isPostWALWriteCalled()); final long now = EnvironmentEdgeManager.currentTime(); - long txid = log.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), - new WALEdit(), true); + long txid = log.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), + new WALEdit()); log.sync(txid); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); @@ -340,8 +339,8 @@ public class TestWALObserver { addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); } - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // sync to fs. wal.sync(); @@ -456,8 +455,8 @@ public class TestWALObserver { edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors - txid = wal.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); + txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit); } if (-1 != txid) { wal.sync(txid); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index db15ca61ea0..a576adc8d19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -604,9 +604,8 @@ public abstract class AbstractTestDLS { // HBaseTestingUtility.createMultiRegions use 5 bytes key byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), - e, true); + log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), + tableName, System.currentTimeMillis(), mvcc), e); if (0 == i % syncEvery) { log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 4893982f937..f2db9464a47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -115,7 +115,7 @@ public class TestBulkLoad { storeFileName = (new Path(storeFileName)).getName(); List storeFileNames = new ArrayList<>(); storeFileNames.add(storeFileName); - when(log.append(any(), any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)), anyBoolean())).thenAnswer(new Answer() { @@ -142,7 +142,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadSingleFamilyHLog() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override @@ -162,7 +162,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadManyFamilyHLog() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override @@ -183,7 +183,7 @@ public class TestBulkLoad { @Test public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4e7c5ad66c4..4f717abe392 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BigDecimalComparator; @@ -4514,10 +4515,9 @@ public class TestHRegion { put.setDurability(mutationDurability); region.put(put); - //verify append called or not - verify(wal, expectAppend ? times(1) : never()) - .append((HRegionInfo)any(), (WALKeyImpl)any(), - (WALEdit)any(), Mockito.anyBoolean()); + // verify append called or not + verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(), + (WALKeyImpl) any(), (WALEdit) any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -5626,12 +5626,10 @@ public class TestHRegion { final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - htd.addFamily(new HColumnDescriptor(fam1)); - htd.addFamily(new HColumnDescriptor(fam2)); - - HRegionInfo hri = new HRegionInfo(htd.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); // open the region w/o rss and wal and flush some files region = @@ -5648,13 +5646,13 @@ public class TestHRegion { // capture append() calls WAL wal = mockWAL(); - when(rss.getWAL((HRegionInfo) any())).thenReturn(wal); + when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null); - verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any() - , editCaptor.capture(), anyBoolean()); + verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), + editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5720,15 +5718,14 @@ public class TestHRegion { /** * Utility method to setup a WAL mock. + *

* Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. * @return a mock WAL - * @throws IOException */ private WAL mockWAL() throws IOException { WAL wal = mock(WAL.class); - Mockito.when(wal.append((HRegionInfo)Mockito.any(), - (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). - thenAnswer(new Answer() { + when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))) + .thenAnswer(new Answer() { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKeyImpl key = invocation.getArgument(1); @@ -5736,32 +5733,38 @@ public class TestHRegion { key.setWriteEntry(we); return 1L; } - - }); + }); + when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class), + anyBoolean())).thenAnswer(new Answer() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + WALKeyImpl key = invocation.getArgument(1); + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); + key.setWriteEntry(we); + return 1L; + } + }); return wal; } @Test public void testCloseRegionWrittenToWAL() throws Exception { - Path rootDir = new Path(dir + name.getMethodName()); FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - htd.addFamily(new HColumnDescriptor(fam1)); - htd.addFamily(new HColumnDescriptor(fam2)); - - final HRegionInfo hri = new HRegionInfo(htd.getTableName(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); ArgumentCaptor editCaptor = ArgumentCaptor.forClass(WALEdit.class); // capture append() calls WAL wal = mockWAL(); - when(rss.getWAL((HRegionInfo) any())).thenReturn(wal); + when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); // create and then open a region first so that it can be closed later @@ -5773,7 +5776,7 @@ public class TestHRegion { region.close(false); // 2 times, one for region open, the other close region - verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(), + verify(wal, times(2)).appendMarker(any(RegionInfo.class), (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getAllValues().get(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index b08877e4fa6..770ee8d1ca3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -1164,8 +1163,8 @@ public class TestHRegionReplayEvents { // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); // test for replay prepare flush putDataByReplay(secondaryRegion, 0, 10, cq, families); @@ -1180,12 +1179,12 @@ public class TestHRegionReplayEvents { primaryRegion.getRegionInfo().getRegionName())) .build()); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); secondaryRegion.close(); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 37270abad46..5bd342288ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -249,7 +249,7 @@ public class TestWALLockup { LOG.info("SET throwing of exception on append"); dodgyWAL.throwException = true; // This append provokes a WAL roll request - dodgyWAL.append(region.getRegionInfo(), key, edit, true); + dodgyWAL.appendData(region.getRegionInfo(), key, edit); boolean exception = false; try { dodgyWAL.sync(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 68eebc17a45..ec3c28d3eda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -25,17 +25,22 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -46,8 +51,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -57,13 +64,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -168,7 +179,7 @@ public abstract class AbstractTestFSWAL { WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); - log.append(hri, key, cols, true); + log.appendData(hri, key, cols); } log.sync(); } @@ -417,7 +428,7 @@ public abstract class AbstractTestFSWAL { final RegionInfo info = region.getRegionInfo(); final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); - wal.append(info, logkey, edits, true); + wal.append(info, logkey, edits, true, false); region.getMVCC().completeAndWait(logkey.getWriteEntry()); } region.flush(true); @@ -466,7 +477,7 @@ public abstract class AbstractTestFSWAL { new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); try { - wal.append(ri, key, cols, true); + wal.append(ri, key, cols, true, false); fail("Should fail since the wal has already been closed"); } catch (IOException e) { // expected @@ -484,4 +495,94 @@ public abstract class AbstractTestFSWAL { wal.close(); wal.rollWriter(); } + + @Test + public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { + final String testName = currentTest.getMethodName(); + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch closeFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (AbstractFSWAL wal = newWAL(FS, FSUtils.getRootDir(CONF), testName, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { + wal.init(); + wal.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + } + } + }); + + // open a new region which uses this WAL + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); + RegionServerServices rsServices = mock(RegionServerServices.class); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); + when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); + final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, + TEST_UTIL.getConfiguration(), rsServices, null); + + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + region.put(new Put(b).addColumn(b, b, b)); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + Map closeResult = region.close(); + LOG.info("Close result:" + closeResult); + closeFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + closeFinished.await(); + + // now check the region's unflushed seqIds. + long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); + assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, + seqId); + + wal.close(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index a20748ae662..a71011347cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -801,15 +801,15 @@ public abstract class AbstractTestWALReplay { long now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // Sync. wal.sync(); @@ -1154,10 +1154,10 @@ public abstract class AbstractTestWALReplay { } private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, - byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, - int index, NavigableMap scopes) throws IOException { + byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, + int index, NavigableMap scopes) throws IOException { FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), - createWALEdit(rowName, family, ee, index), hri, true, null); + createWALEdit(rowName, family, ee, index), hri, true, false, null); entry.stampRegionSequenceId(mvcc.begin()); return entry; } @@ -1167,8 +1167,8 @@ public abstract class AbstractTestWALReplay { final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) throws IOException { for (int j = 0; j < count; j++) { - wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), - createWALEdit(rowName, family, ee, j), true); + wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), + createWALEdit(rowName, family, ee, j)); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java index 420585f448c..cf4862b2c33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java @@ -83,7 +83,7 @@ public final class ProtobufLogTestHelper { throws IOException { for (int i = 0; i < recordCount; i++) { WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc); - wal.append(hri, entry.getKey(), entry.getEdit(), true); + wal.appendData(hri, entry.getKey(), entry.getEdit()); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 4cfe49e9e9e..d156c5ecd43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -196,7 +196,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); try { - wal.append(ri, key, cols, true); + wal.append(ri, key, cols, true, false); } catch (IOException e) { // should not happen throw new UncheckedIOException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed1372a1e..a7d4a55bccd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -207,9 +207,8 @@ public class TestLogRollAbort { kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(Bytes.toBytes("column"), 0); - log.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), - kvs, true); + log.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 819df673c94..63c3de18ef1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -166,8 +166,8 @@ public class TestLogRollingNoCluster { for(byte[] fam : htd.getColumnFamilyNames()) { scopes.put(fam, 0); } - final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); + final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), + TableName.META_TABLE_NAME, now, mvcc, scopes), edit); Threads.sleep(ThreadLocalRandom.current().nextInt(5)); wal.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 0967a756f45..dd83c7c1ce9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -111,9 +111,8 @@ public class TestWALActionsListener { edit.add(kv); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(b, 0); - long txid = wal.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit, - true); + long txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 40a3f2a80f0..8178a236c5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -423,7 +423,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { long now = EnvironmentEdgeManager.currentTime(); edit.add(new KeyValue(rowName, famName, qualifier, now, value)); WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); - wal.append(hri, walKey, edit, true); + wal.appendData(hri, walKey, edit); wal.sync(); Get get = new Get(rowName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 3a1320cf66e..0cdf1cf66b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -300,11 +300,9 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); } LOG.info(Long.toString(i)); - final long txid = wal.append( - hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, - true); + final long txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + edit); wal.sync(txid); } @@ -316,9 +314,9 @@ public abstract class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - wal.append(hri, + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, true); + edit); } wal.sync(); @@ -338,9 +336,9 @@ public abstract class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(source, new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); - wal.append(hri, + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, true); + edit); wal.sync(); assertEquals(1, manager.getWALs().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index a873320ef91..2a21660dd47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -556,9 +556,9 @@ public class TestWALEntryStream { } private void appendToLog(String key) throws IOException { - final long txid = log.append(info, + final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), - mvcc, scopes), getWALEdit(key), true); + mvcc, scopes), getWALEdit(key)); log.sync(txid); } @@ -580,8 +580,8 @@ public class TestWALEntryStream { } private long appendToLog(int count) throws IOException { - return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); + return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count)); } private WALEdit getWALEdits(int count) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index c7f1c411448..01de1f47731 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -63,12 +63,12 @@ public class FaultyFSLog extends FSHLog { } @Override - public long append(RegionInfo info, WALKeyImpl key, - WALEdit edits, boolean inMemstore) throws IOException { + protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); } - return super.append(info, key, edits, inMemstore); + return super.append(info, key, edits, inMemstore, closeRegion); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 3205d7328e3..be84eabbf06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -156,8 +156,8 @@ public class TestFSHLogProvider { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), - cols, true); + log.appendData(hri, + getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols); } log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 81938065bbc..eebc11c0b95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -129,8 +129,8 @@ public class TestSecureWAL { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09dd30b..97a755121a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -208,7 +208,7 @@ public class TestWALFactory { LOG.info("Region " + i + ": " + edit); WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes); - log.append(infos[i], walKey, edit, true); + log.appendData(infos[i], walKey, edit); walKey.getWriteEntry(); } log.sync(); @@ -270,8 +270,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -289,8 +289,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); reader = wals.createReader(fs, walPath); @@ -311,8 +311,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -388,9 +388,8 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), - kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -522,10 +521,8 @@ public class TestWALFactory { .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); final WAL log = wals.getWAL(info); - final long txid = log.append(info, - new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc, scopes), - cols, true); + final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.completeCacheFlush(info.getEncodedNameAsBytes()); @@ -580,10 +577,8 @@ public class TestWALFactory { } RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); final WAL log = wals.getWAL(hri); - final long txid = log.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc, scopes), - cols, true); + final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); @@ -634,8 +629,8 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), cols, true); + log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols); } log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -644,8 +639,8 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), cols, true); + log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols); log.sync(); assertEquals(COL_COUNT, visitor.increments); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index bc21a65b163..83ad5fa677f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -118,8 +118,8 @@ public class TestWALReaderOnSecureWAL { } else { kvs.add(kv); } - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java index 40fad6ad520..6ea1daf47a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java @@ -98,8 +98,8 @@ public class TestWALRootDir { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(regionInfo, - getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true); + long txid = + log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit); log.sync(txid); assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size()); @@ -109,8 +109,7 @@ public class TestWALRootDir { HConstants.HREGION_LOGDIR_NAME)).size()); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), - edit, true); + txid = log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit); log.sync(txid); log.rollWriter(); log.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 861b289f144..7e6ed8fc426 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -184,7 +184,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { RegionInfo hri = region.getRegionInfo(); final WALKeyImpl logkey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); - wal.append(hri, logkey, walEdit, true); + wal.appendData(hri, logkey, walEdit); if (!this.noSync) { if (++lastSync >= this.syncInterval) { wal.sync();