From d234b4dec2d50c9184f3b4affaa68ec232445f29 Mon Sep 17 00:00:00 2001 From: shahrs87 Date: Fri, 29 Jan 2021 04:17:30 -0800 Subject: [PATCH] [HBASE-25536] Remove 0 length wal file from logQueue if it belongs to old sources (#2908) Signed-off-by: Wellington Chevreuil Signed-off-by: Geoffrey Jacoby Signed-off-by: Bharath Vissapragada Signed-off-by: Viraj Jasani --- .../ReplicationSourceWALReader.java | 4 ++- .../regionserver/TestWALEntryStream.java | 30 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index a6d87870b49..be262a6d950 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -247,8 +247,10 @@ class ReplicationSourceWALReader extends Thread { // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(IOException e) { + // Dump the log even if logQueue size is 1 if the source is from recovered Source + // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { + (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 63e7a8b9049..1db9c175e92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -652,4 +653,33 @@ public class TestWALEntryStream { assertFalse(entryStream.hasNext()); } } + + /* + Test removal of 0 length log from logQueue if the source is a recovered source and + size of logQueue is only 1. + */ + @Test + public void testEOFExceptionForRecoveredQueue() throws Exception { + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + // Create a 0 length log. + Path emptyLog = new Path("emptyLog"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + queue.add(emptyLog); + + Configuration conf = new Configuration(CONF); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + // Create a reader thread with source as recovered source. + ReplicationSource source = mockReplicationSource(true, conf); + when(source.isPeerEnabled()).thenReturn(true); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source); + reader.run(); + // ReplicationSourceWALReaderThread#handleEofException method will + // remove empty log from logQueue. + assertEquals(0, queue.size()); + } }