HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
d10847cc3f
commit
9a8fff78e7
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -274,11 +275,15 @@ class ReplicationSourceWALReader extends Thread {
|
|||
// since we don't add current log to recovered source queue so it is safe to remove.
|
||||
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
|
||||
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
|
||||
Path head = queue.peek();
|
||||
Path path = queue.peek();
|
||||
try {
|
||||
if (fs.getFileStatus(head).getLen() == 0) {
|
||||
// head of the queue is an empty log file
|
||||
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
|
||||
if (!fs.exists(path)) {
|
||||
// There is a chance that wal has moved to oldWALs directory, so look there also.
|
||||
path = AbstractFSWALProvider.findArchivedLog(path, conf);
|
||||
// path is null if it couldn't find archive path.
|
||||
}
|
||||
if (path != null && fs.getFileStatus(path).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
|
||||
logQueue.remove(walGroupId);
|
||||
currentPosition = 0;
|
||||
if (batch != null) {
|
||||
|
@ -289,7 +294,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return true;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
|
||||
LOG.warn("Couldn't get file length information about log " + path, ioe);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.trace("Interrupted while adding WAL batch to ship queue");
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
@ -27,13 +27,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -315,35 +315,10 @@ class WALEntryStream implements Closeable {
|
|||
return false;
|
||||
}
|
||||
|
||||
private Path getArchivedLog(Path path) throws IOException {
|
||||
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
|
||||
|
||||
// Try found the log in old dir
|
||||
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path archivedLogLocation = new Path(oldLogDir, path.getName());
|
||||
if (fs.exists(archivedLogLocation)) {
|
||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||
return archivedLogLocation;
|
||||
}
|
||||
|
||||
// Try found the log in the seperate old log dir
|
||||
oldLogDir =
|
||||
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
|
||||
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
|
||||
archivedLogLocation = new Path(oldLogDir, path.getName());
|
||||
if (fs.exists(archivedLogLocation)) {
|
||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||
return archivedLogLocation;
|
||||
}
|
||||
|
||||
LOG.error("Couldn't locate log: " + path);
|
||||
return path;
|
||||
}
|
||||
|
||||
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
|
||||
// If the log was archived, continue reading from there
|
||||
Path archivedLog = getArchivedLog(path);
|
||||
if (!path.equals(archivedLog)) {
|
||||
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
|
||||
if (archivedLog != null) {
|
||||
openReader(archivedLog);
|
||||
} else {
|
||||
throw fnfe;
|
||||
|
@ -407,8 +382,8 @@ class WALEntryStream implements Closeable {
|
|||
seek();
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// If the log was archived, continue reading from there
|
||||
Path archivedLog = getArchivedLog(currentPath);
|
||||
if (!currentPath.equals(archivedLog)) {
|
||||
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
|
||||
if (archivedLog != null) {
|
||||
openReader(archivedLog);
|
||||
} else {
|
||||
throw fnfe;
|
||||
|
|
|
@ -478,6 +478,43 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the archived WAL file path if it is not able to locate in WALs dir.
|
||||
* @param path - active WAL file path
|
||||
* @param conf - configuration
|
||||
* @return archived path if exists, null - otherwise
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
|
||||
// If the path contains oldWALs keyword then exit early.
|
||||
if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
|
||||
return null;
|
||||
}
|
||||
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
|
||||
FileSystem fs = path.getFileSystem(conf);
|
||||
// Try finding the log in old dir
|
||||
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path archivedLogLocation = new Path(oldLogDir, path.getName());
|
||||
if (fs.exists(archivedLogLocation)) {
|
||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||
return archivedLogLocation;
|
||||
}
|
||||
|
||||
ServerName serverName = getServerNameFromWALDirectoryName(path);
|
||||
// Try finding the log in separate old log dir
|
||||
oldLogDir =
|
||||
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
|
||||
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
|
||||
archivedLogLocation = new Path(oldLogDir, path.getName());
|
||||
if (fs.exists(archivedLogLocation)) {
|
||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||
return archivedLogLocation;
|
||||
}
|
||||
|
||||
LOG.error("Couldn't locate log: " + path);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens WAL reader with retries and additional exception handling
|
||||
* @param path path to WAL file
|
||||
|
|
|
@ -52,12 +52,14 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -716,4 +718,48 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
|
|||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test
|
||||
public void testEOFExceptionInOldWALsDirectory() throws Exception {
|
||||
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
|
||||
AbstractFSWAL abstractWAL = (AbstractFSWAL)log;
|
||||
Path emptyLogFile = abstractWAL.getCurrentFileName();
|
||||
log.rollWriter(true);
|
||||
|
||||
// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
|
||||
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
|
||||
// oldWALs directory.
|
||||
Waiter.waitFor(CONF, 5000,
|
||||
(Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
|
||||
// There will 2 logs in the queue.
|
||||
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
|
||||
|
||||
// Get the archived dir path for the first wal.
|
||||
Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
|
||||
// Make sure that the wal path is not the same as archived Dir path.
|
||||
assertNotNull(archivePath);
|
||||
assertTrue(fs.exists(archivePath));
|
||||
fs.truncate(archivePath, 0);
|
||||
// make sure the size of the wal file is 0.
|
||||
assertEquals(0, fs.getFileStatus(archivePath).getLen());
|
||||
|
||||
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
||||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
|
||||
Configuration localConf = new Configuration(CONF);
|
||||
localConf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
localConf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
// Start the reader thread.
|
||||
createReader(false, localConf);
|
||||
// Wait for the replication queue size to be 1. This means that we have handled
|
||||
// 0 length wal from oldWALs directory.
|
||||
Waiter.waitFor(localConf, 10000,
|
||||
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue