HBASE-26075: Replication is stuck due to zero length wal file in oldWALs dir (#3467)

Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Rushabh Shah 2021-07-09 18:24:20 -04:00 committed by GitHub
parent b7fbfdd078
commit 3b3ec323e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 5 deletions

View File

@ -308,10 +308,15 @@ public class ReplicationSourceWALReaderThread extends Thread {
// add current log to recovered source queue so it is safe to remove.
if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1)
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
Path path = queue.peek();
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
lastReadPath = queue.peek();
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = entryStream.getArchivedLog(path);
}
if (fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + path);
lastReadPath = path;
logQueue.remove(walGroupId);
lastReadPosition = 0;
@ -325,7 +330,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek());
LOG.warn("Couldn't get file length information about log " + path, ioe);
}
}

View File

@ -312,7 +312,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
return false;
}
private Path getArchivedLog(Path path) throws IOException {
Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
@ -917,4 +918,52 @@ public class TestWALEntryStream {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
* @throws Exception exception
*/
@Test
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
FSHLog fsLog = (FSHLog)log;
Path emptyLogFile = fsLog.getCurrentFileName();
log.rollWriter(true);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
Configuration localConf = new Configuration(conf);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
localConf.setBoolean("replication.source.eof.autorecovery", true);
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, localConf, logQueue.getMetrics(), fakeWalGroupId)) {
// Get the archived dir path for the first wal.
Path archivePath = entryStream.getArchivedLog(emptyLogFile);
// Make sure that the wal path is not the same as archived Dir path.
assertNotEquals(emptyLogFile.toString(), archivePath.toString());
assertTrue(fs.exists(archivePath));
fs.truncate(archivePath, 0);
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());
}
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Start the reader thread.
ReplicationSourceWALReaderThread readerThread =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue, 0,
fs, localConf, getDummyFilter(), logQueue.getMetrics(), source, fakeWalGroupId);
readerThread.start();
// Wait for the replication queue size to be 1. This means that we have handled
// 0 length wal from oldWALs directory.
Waiter.waitFor(conf, 10000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return logQueue.getQueueSize(fakeWalGroupId) == 1;
}
});
}
}