diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 54ed3ab2153..804793dae58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -308,10 +308,15 @@ public class ReplicationSourceWALReaderThread extends Thread { // add current log to recovered source queue so it is safe to remove. if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1) && conf.getBoolean("replication.source.eof.autorecovery", false)) { + Path path = queue.peek(); try { - if (fs.getFileStatus(queue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek()); - lastReadPath = queue.peek(); + if (!fs.exists(path)) { + // There is a chance that wal has moved to oldWALs directory, so look there also. + path = entryStream.getArchivedLog(path); + } + if (fs.getFileStatus(path).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + path); + lastReadPath = path; logQueue.remove(walGroupId); lastReadPosition = 0; @@ -325,7 +330,7 @@ public class ReplicationSourceWALReaderThread extends Thread { return true; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + queue.peek()); + LOG.warn("Couldn't get file length information about log " + path, ioe); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index fbca0e6c4cf..88979b06ef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -312,7 +312,7 @@ public class WALEntryStream implements Iterator, Closeable, Iterable() { + @Override public boolean evaluate() { + return logQueue.getQueueSize(fakeWalGroupId) == 1; + } + }); + } }