diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 14fbe10d52a..ce8dafa4a55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -661,9 +661,26 @@ public abstract class AbstractFSWAL implements WAL {
}
}
+ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
+ int oldNumEntries = this.numEntries.getAndSet(0);
+ String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
+ if (oldPath != null) {
+ this.walFile2Props.put(oldPath,
+ new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
+ this.totalLogSize.addAndGet(oldFileLen);
+ LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
+ CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
+ newPathString);
+ } else {
+ LOG.info("New WAL {}", newPathString);
+ }
+ }
+
/**
+ *
* Cleans up current writer closing it and then puts in place the passed in
* nextWriter
.
+ *
*
*
* - In the case of creating a new WAL, oldPath will be null.
@@ -672,26 +689,17 @@ public abstract class AbstractFSWAL implements WAL {
* - In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.
*
+ *
* @param oldPath may be null
* @param newPath may be null
* @param nextWriter may be null
* @return the passed in newPath
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
+ @VisibleForTesting
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
- long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
- int oldNumEntries = this.numEntries.getAndSet(0);
- final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
- if (oldPath != null) {
- this.walFile2Props.put(oldPath,
- new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
- this.totalLogSize.addAndGet(oldFileLen);
- LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
- ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
- } else {
- LOG.info("New WAL " + newPathString);
- }
+ doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
}
}
@@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL implements WAL {
protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
- /**
- * @return old wal file size
- */
- protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
+ protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
throws IOException;
protected abstract void doShutdown() throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 19d89dfec4f..d22d1ec6fe9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL {
}
}
+ private long closeWriter() {
+ AsyncWriter oldWriter = this.writer;
+ if (oldWriter != null) {
+ long fileLength = oldWriter.getLength();
+ closeExecutor.execute(() -> {
+ try {
+ oldWriter.close();
+ } catch (IOException e) {
+ LOG.warn("close old writer failed", e);
+ }
+ });
+ return fileLength;
+ } else {
+ return 0L;
+ }
+ }
+
@Override
- protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
+ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
waitForSafePoint();
- final AsyncWriter oldWriter = this.writer;
+ long oldFileLen = closeWriter();
+ logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
@@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL {
} finally {
consumeLock.unlock();
}
- return executeClose(closeExecutor, oldWriter);
}
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
- executeClose(closeExecutor, writer);
+ closeWriter();
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
@@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL {
}
}
- private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
- long fileLength;
- if (writer != null) {
- fileLength = writer.getLength();
- closeExecutor.execute(() -> {
- try {
- writer.close();
- } catch (IOException e) {
- LOG.warn("close old writer failed", e);
- }
- });
- } else {
- fileLength = 0L;
- }
- return fileLength;
- }
-
@Override
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
writer.append(entry);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 9688bbd323e..67258ec0d62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2023932de4a..4cc3393e58f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL {
}
@Override
- protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
+ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
// Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
// thread will eventually pause. An error hereafter needs to release the writer thread
// regardless -- hence the finally block below. Note, this method is called from the FSHLog
@@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL {
zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
}
afterCreatingZigZagLatch();
- long oldFileLen = 0L;
try {
// Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
// ring buffer between the above notification of writer that we want it to go to
@@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL {
LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
+ long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
@@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL {
}
}
}
+ logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL {
}
}
}
- return oldFileLen;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index aeb2c19c25b..9d36429975a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
@@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/**
* Writer for protobuf-based WAL.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index f4cacb28001..e14ce0c9274 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,25 +27,28 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ImmutableByteArray;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ImmutableByteArray;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
+ *
* Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
+ *
*
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
- * {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for
- * family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much
- * faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more
- * details.
+ * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
+ * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
+ * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
+ * HBASE-16278 for more details.
+ *
*/
@InterfaceAudience.Private
class SequenceIdAccounting {
@@ -93,14 +94,17 @@ class SequenceIdAccounting {
*/
private final Map> flushingSequenceIds = new HashMap<>();
- /**
- * Map of region encoded names to the latest/highest region sequence id. Updated on each
- * call to append.
- *
- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
- * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
- * the same array.
- */
+ /**
+ *
+ * Map of region encoded names to the latest/highest region sequence id. Updated on each call to
+ * append.
+ *
+ *
+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
+ * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a
+ * given region, it always returns the same array.
+ *
+ */
private Map highestSequenceIds = new HashMap<>();
/**