From e265eccf206b70a30c5d13c921e42f0817cb4798 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Wed, 26 May 2021 13:42:03 -0400 Subject: [PATCH] HBASE-25924 Re-compute size of WAL file while removing from WALEntryStream (#3315) Signed-off-by: Andrew Purtell Signed-off-by: Bharath Vissapragada --- .../MetricsReplicationSourceSource.java | 1 + ...tricsReplicationGlobalSourceSourceImpl.java | 6 ++++++ .../MetricsReplicationSourceSourceImpl.java | 5 +++++ .../regionserver/wal/ProtobufLogReader.java | 4 ++++ .../regionserver/MetricsSource.java | 9 +++++++++ .../regionserver/TestWALEntryStream.java | 18 ++++++++++++++++++ 6 files changed, 43 insertions(+) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 1360afec6d7..a6cf79b710f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -72,6 +72,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void decrSizeOfHFileRefsQueue(long size); void incrUnknownFileLengthForClosedWAL(); void incrUncleanlyClosedWALs(); + long getUncleanlyClosedWALs(); void incrBytesSkippedInUncleanlyClosedWALs(final long bytes); void incrRestartedWALReading(); void incrRepeatedFileBytes(final long bytes); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 6f31b4ec71a..8a6553f882f 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -189,6 +189,12 @@ public class MetricsReplicationGlobalSourceSourceImpl public void incrUncleanlyClosedWALs() { uncleanlyClosedWAL.incr(1L); } + + @Override + public long getUncleanlyClosedWALs() { + return uncleanlyClosedWAL.value(); + } + @Override public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { uncleanlyClosedSkippedBytes.incr(bytes); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 68c1f17ba43..016135422a0 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -245,6 +245,11 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou uncleanlyClosedWAL.incr(1L); } + @Override + public long getUncleanlyClosedWALs() { + return uncleanlyClosedWAL.value(); + } + @Override public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { uncleanlyClosedSkippedBytes.incr(bytes); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index c86dd4d130b..8aba943d0fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -208,6 +208,10 @@ public class ProtobufLogReader extends ReaderBase { private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { close(); + if (!isFirst) { + // Re-compute the file length. + this.fileLength = fs.getFileStatus(path).getLen(); + } long expectedPos = PB_WAL_MAGIC.length; if (stream == null) { stream = fs.open(path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 036be91566d..3b97e1ed3ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -311,6 +311,15 @@ public class MetricsSource implements BaseSource { return getTimestampOfLastShippedOp(); } + + /** + * Get the value of uncleanlyClosedWAL counter + * @return uncleanlyClosedWAL + */ + public long getUncleanlyClosedWALs() { + return singleSourceSource.getUncleanlyClosedWALs(); + } + /** * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one * @return lastTimestampForAge diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index b7d2a084862..5507972ab4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -862,4 +862,22 @@ public class TestWALEntryStream { // After removing one wal, size of log queue will be 1 again. assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); } + + /** + * Tests that wals are closed cleanly and we read the trailer when we remove wal + * from WALEntryStream. + */ + @Test + public void testCleanClosedWALs() throws Exception { + try (WALEntryStream entryStream = new WALEntryStream( + logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { + assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); + appendToLogAndSync(); + assertNotNull(entryStream.next()); + log.rollWriter(); + appendToLogAndSync(); + assertNotNull(entryStream.next()); + assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); + } + } }