diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 14fbe10d52a..ce8dafa4a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -661,9 +661,26 @@ public abstract class AbstractFSWAL implements WAL { } } + protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { + int oldNumEntries = this.numEntries.getAndSet(0); + String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; + if (oldPath != null) { + this.walFile2Props.put(oldPath, + new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); + this.totalLogSize.addAndGet(oldFileLen); + LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", + CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), + newPathString); + } else { + LOG.info("New WAL {}", newPathString); + } + } + /** + *

* Cleans up current writer closing it and then puts in place the passed in * nextWriter. + *

*

*

+ *

* @param oldPath may be null * @param newPath may be null * @param nextWriter may be null * @return the passed in newPath * @throws IOException if there is a problem flushing or closing the underlying FS */ + @VisibleForTesting Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { - long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); - int oldNumEntries = this.numEntries.getAndSet(0); - final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath)); - if (oldPath != null) { - this.walFile2Props.put(oldPath, - new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); - this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + - ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); - } else { - LOG.info("New WAL " + newPathString); - } + doReplaceWriter(oldPath, newPath, nextWriter); return newPath; } } @@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL implements WAL { protected abstract W createWriterInstance(Path path) throws IOException, CommonFSUtils.StreamLacksCapabilityException; - /** - * @return old wal file size - */ - protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter) + protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException; protected abstract void doShutdown() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 19d89dfec4f..d22d1ec6fe9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + private long closeWriter() { + AsyncWriter oldWriter = this.writer; + if (oldWriter != null) { + long fileLength = oldWriter.getLength(); + closeExecutor.execute(() -> { + try { + oldWriter.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); + } + }); + return fileLength; + } else { + return 0L; + } + } + @Override - protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) + protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) throws IOException { waitForSafePoint(); - final AsyncWriter oldWriter = this.writer; + long oldFileLen = closeWriter(); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); @@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL { } finally { consumeLock.unlock(); } - return executeClose(closeExecutor, oldWriter); } @Override protected void doShutdown() throws IOException { waitForSafePoint(); - executeClose(closeExecutor, writer); + closeWriter(); closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { @@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) { - long fileLength; - if (writer != null) { - fileLength = writer.getLength(); - closeExecutor.execute(() -> { - try { - writer.close(); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } - }); - } else { - fileLength = 0L; - } - return fileLength; - } - @Override protected void doAppend(AsyncWriter writer, FSWALEntry entry) { writer.append(entry); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 9688bbd323e..67258ec0d62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index a9a3835772a..d927b7e609b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL { } @Override - protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { + protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer // thread will eventually pause. An error hereafter needs to release the writer thread // regardless -- hence the finally block below. Note, this method is called from the FSHLog @@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL { zigzagLatch = this.ringBufferEventHandler.attainSafePoint(); } afterCreatingZigZagLatch(); - long oldFileLen = 0L; try { // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the // ring buffer between the above notification of writer that we want it to go to @@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL { LOG.warn( "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); } + long oldFileLen = 0L; // It is at the safe point. Swap out writer from under the blocked writer thread. // TODO: This is close is inline with critical section. Should happen in background? if (this.writer != null) { @@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL { } } } + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); @@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL { } } } - return oldFileLen; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index aeb2c19c25b..9d36429975a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; @@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; /** * Writer for protobuf-based WAL. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index f4cacb28001..e14ce0c9274 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,25 +27,28 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ImmutableByteArray; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ImmutableByteArray; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** + *

* Accounting of sequence ids per region and then by column family. So we can our accounting * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can * keep abreast of the state of sequence id persistence. Also call update per append. + *

*

* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by - * {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for - * family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much - * faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more - * details. + * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use + * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because + * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See + * HBASE-16278 for more details. + *

*/ @InterfaceAudience.Private class SequenceIdAccounting { @@ -93,14 +94,17 @@ class SequenceIdAccounting { */ private final Map> flushingSequenceIds = new HashMap<>(); - /** - * Map of region encoded names to the latest/highest region sequence id. Updated on each - * call to append. - *

- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we - * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns - * the same array. - */ + /** + *

+ * Map of region encoded names to the latest/highest region sequence id. Updated on each call to + * append. + *

+ *

+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we + * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a + * given region, it always returns the same array. + *

+ */ private Map highestSequenceIds = new HashMap<>(); /**