HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
083b632c4b
commit
8055a64b80
|
@ -246,8 +246,10 @@ class ReplicationSourceWALReader extends Thread {
|
|||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
private void handleEofException(IOException e) {
|
||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
||||
// since we don't add current log to recovered source queue so it is safe to remove.
|
||||
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
|
||||
logQueue.size() > 1 && this.eofAutoRecovery) {
|
||||
(source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
|
||||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -652,4 +653,33 @@ public class TestWALEntryStream {
|
|||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Test removal of 0 length log from logQueue if the source is a recovered source and
|
||||
size of logQueue is only 1.
|
||||
*/
|
||||
@Test
|
||||
public void testEOFExceptionForRecoveredQueue() throws Exception {
|
||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path("emptyLog");
|
||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||
queue.add(emptyLog);
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Override the max retries multiplier to fail fast.
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
// Create a reader thread with source as recovered source.
|
||||
ReplicationSource source = mockReplicationSource(true, conf);
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
ReplicationSourceWALReader reader =
|
||||
new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
|
||||
reader.run();
|
||||
// ReplicationSourceWALReaderThread#handleEofException method will
|
||||
// remove empty log from logQueue.
|
||||
assertEquals(0, queue.size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue