diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index a6d87870b49..be262a6d950 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -247,8 +247,10 @@ class ReplicationSourceWALReader extends Thread { // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(IOException e) { + // Dump the log even if logQueue size is 1 if the source is from recovered Source + // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { + (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 63e7a8b9049..1db9c175e92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -652,4 +653,33 @@ public class TestWALEntryStream { assertFalse(entryStream.hasNext()); } } + + /* + Test removal of 0 length log from logQueue if the source is a recovered source and + size of logQueue is only 1. + */ + @Test + public void testEOFExceptionForRecoveredQueue() throws Exception { + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + // Create a 0 length log. + Path emptyLog = new Path("emptyLog"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + queue.add(emptyLog); + + Configuration conf = new Configuration(CONF); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + // Create a reader thread with source as recovered source. + ReplicationSource source = mockReplicationSource(true, conf); + when(source.isPeerEnabled()).thenReturn(true); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source); + reader.run(); + // ReplicationSourceWALReaderThread#handleEofException method will + // remove empty log from logQueue. + assertEquals(0, queue.size()); + } }