HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Rushabh Shah 2021-07-23 12:32:55 -04:00 committed by GitHub
parent 51ed95c0cb
commit 0294c73f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 36 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
@ -274,11 +275,15 @@ class ReplicationSourceWALReader extends Thread {
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
Path head = queue.peek();
Path path = queue.peek();
try {
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
// path is null if it couldn't find archive path.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
logQueue.remove(walGroupId);
currentPosition = 0;
if (batch != null) {
@ -289,7 +294,7 @@ class ReplicationSourceWALReader extends Thread {
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
LOG.warn("Couldn't get file length information about log " + path, ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();

View File

@ -27,13 +27,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -316,35 +316,10 @@ class WALEntryStream implements Closeable {
return false;
}
private Path getArchivedLog(Path path) throws IOException {
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
// Try found the log in old dir
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
// Try found the log in the seperate old log dir
oldLogDir =
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
LOG.error("Couldn't locate log: " + path);
return path;
}
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(path);
if (!path.equals(archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
@ -408,8 +383,8 @@ class WALEntryStream implements Closeable {
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(currentPath);
if (!currentPath.equals(archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@ -500,6 +499,43 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
}
/**
* Find the archived WAL file path if it is not able to locate in WALs dir.
* @param path - active WAL file path
* @param conf - configuration
* @return archived path if exists, null - otherwise
* @throws IOException exception
*/
public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
// If the path contains oldWALs keyword then exit early.
if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
return null;
}
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
FileSystem fs = path.getFileSystem(conf);
// Try finding the log in old dir
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
ServerName serverName = getServerNameFromWALDirectoryName(path);
// Try finding the log in separate old log dir
oldLogDir =
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
LOG.error("Couldn't locate log: " + path);
return null;
}
/**
* Opens WAL reader with retries and additional exception handling
* @param path path to WAL file

View File

@ -52,12 +52,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -716,4 +718,48 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
* @throws Exception exception
*/
@Test
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL abstractWAL = (AbstractFSWAL)log;
Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);
// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
// oldWALs directory.
Waiter.waitFor(CONF, 5000,
(Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
// Get the archived dir path for the first wal.
Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
// Make sure that the wal path is not the same as archived Dir path.
assertNotNull(archivePath);
assertTrue(fs.exists(archivePath));
fs.truncate(archivePath, 0);
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Configuration localConf = new Configuration(CONF);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
localConf.setBoolean("replication.source.eof.autorecovery", true);
// Start the reader thread.
createReader(false, localConf);
// Wait for the replication queue size to be 1. This means that we have handled
// 0 length wal from oldWALs directory.
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}
}