[HBASE-25536] Remove 0 length wal file from logQueue if it belongs to old sources (#2908)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
shahrs87 2021-01-29 04:17:30 -08:00 committed by GitHub
parent de51a40b53
commit d234b4dec2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 1 deletions

View File

@ -247,8 +247,10 @@ class ReplicationSourceWALReader extends Thread {
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log // enabled, then dump the log
private void handleEofException(IOException e) { private void handleEofException(IOException e) {
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) && if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
logQueue.size() > 1 && this.eofAutoRecovery) { (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
try { try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());

View File

@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -652,4 +653,33 @@ public class TestWALEntryStream {
assertFalse(entryStream.hasNext()); assertFalse(entryStream.hasNext());
} }
} }
/*
Test removal of 0 length log from logQueue if the source is a recovered source and
size of logQueue is only 1.
*/
@Test
public void testEOFExceptionForRecoveredQueue() throws Exception {
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
// Create a 0 length log.
Path emptyLog = new Path("emptyLog");
FSDataOutputStream fsdos = fs.create(emptyLog);
fsdos.close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
queue.add(emptyLog);
Configuration conf = new Configuration(CONF);
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
// Create a reader thread with source as recovered source.
ReplicationSource source = mockReplicationSource(true, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
reader.run();
// ReplicationSourceWALReaderThread#handleEofException method will
// remove empty log from logQueue.
assertEquals(0, queue.size());
}
} }