diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 0cc9f9fdec6..105beb55216 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -165,6 +165,12 @@ public class MetricsReplicationGlobalSourceSourceImpl public void incrUncleanlyClosedWALs() { uncleanlyClosedWAL.incr(1L); } + + @Override + public long getUncleanlyClosedWALs() { + return uncleanlyClosedWAL.value(); + } + @Override public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { uncleanlyClosedSkippedBytes.incr(bytes); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 490aec92003..42e28f5d0f3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -69,6 +69,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void decrSizeOfHFileRefsQueue(long size); void incrUnknownFileLengthForClosedWAL(); void incrUncleanlyClosedWALs(); + long getUncleanlyClosedWALs(); void incrBytesSkippedInUncleanlyClosedWALs(final long bytes); void incrRestartedWALReading(); void incrRepeatedFileBytes(final long bytes); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 18d536a65e2..57aea3f63a8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -233,6 +233,11 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou uncleanlyClosedWAL.incr(1L); } + @Override + public long getUncleanlyClosedWALs() { + return uncleanlyClosedWAL.value(); + } + @Override public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { uncleanlyClosedSkippedBytes.incr(bytes); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index c86dd4d130b..8aba943d0fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -208,6 +208,10 @@ public class ProtobufLogReader extends ReaderBase { private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { close(); + if (!isFirst) { + // Re-compute the file length. + this.fileLength = fs.getFileStatus(path).getLen(); + } long expectedPos = PB_WAL_MAGIC.length; if (stream == null) { stream = fs.open(path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index a91963ec461..3ab08065ca7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -300,6 +300,15 @@ public class MetricsSource implements BaseSource { return singleSourceSource.getSizeOfLogQueue(); } + + /** + * Get the value of uncleanlyClosedWAL counter + * @return uncleanlyClosedWAL + */ + public long getUncleanlyClosedWALs() { + return singleSourceSource.getUncleanlyClosedWALs(); + } + /** * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one * @return lastTimestampForAge diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index b7d2a084862..5507972ab4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -862,4 +862,22 @@ public class TestWALEntryStream { // After removing one wal, size of log queue will be 1 again. assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); } + + /** + * Tests that wals are closed cleanly and we read the trailer when we remove wal + * from WALEntryStream. + */ + @Test + public void testCleanClosedWALs() throws Exception { + try (WALEntryStream entryStream = new WALEntryStream( + logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { + assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); + appendToLogAndSync(); + assertNotNull(entryStream.next()); + log.rollWriter(); + appendToLogAndSync(); + assertNotNull(entryStream.next()); + assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); + } + } }