From 0294c73f1f0d47a58bd6a8486d5ac3275139f807 Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Fri, 23 Jul 2021 12:32:55 -0400 Subject: [PATCH] HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504) Signed-off-by: Andrew Purtell Signed-off-by: Bharath Vissapragada Signed-off-by: Duo Zhang --- .../ReplicationSourceWALReader.java | 15 ++++-- .../regionserver/WALEntryStream.java | 35 ++------------ .../hbase/wal/AbstractFSWALProvider.java | 38 ++++++++++++++- .../regionserver/TestBasicWALEntryStream.java | 46 +++++++++++++++++++ 4 files changed, 98 insertions(+), 36 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index ca411848077..9af91e5a59f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -274,11 +275,15 @@ class ReplicationSourceWALReader extends Thread { // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { - Path head = queue.peek(); + Path path = queue.peek(); try { - if (fs.getFileStatus(head).getLen() == 0) { - // head of the queue is an empty log file - LOG.warn("Forcing removal of 0 length log in queue: {}", head); + if (!fs.exists(path)) { + // There is a chance that wal has moved to oldWALs directory, so look there also. + path = AbstractFSWALProvider.findArchivedLog(path, conf); + // path is null if it couldn't find archive path. + } + if (path != null && fs.getFileStatus(path).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: {}", path); logQueue.remove(walGroupId); currentPosition = 0; if (batch != null) { @@ -289,7 +294,7 @@ class ReplicationSourceWALReader extends Thread { return true; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe); + LOG.warn("Couldn't get file length information about log " + path, ioe); } catch (InterruptedException ie) { LOG.trace("Interrupted while adding WAL batch to ship queue"); Thread.currentThread().interrupt(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 53cd0845a9c..f04819d4f82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -27,13 +27,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -316,35 +316,10 @@ class WALEntryStream implements Closeable { return false; } - private Path getArchivedLog(Path path) throws IOException { - Path walRootDir = CommonFSUtils.getWALRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); - if (!path.equals(archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + if (archivedLog != null) { openReader(archivedLog); } else { throw fnfe; @@ -408,8 +383,8 @@ class WALEntryStream implements Closeable { seek(); } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); - if (!currentPath.equals(archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); + if (archivedLog != null) { openReader(archivedLog); } else { throw fnfe; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 4b74e10b35f..ffe5c422a5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.wal; - import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -500,6 +499,43 @@ public abstract class AbstractFSWALProvider> implemen } } + /** + * Find the archived WAL file path if it is not able to locate in WALs dir. + * @param path - active WAL file path + * @param conf - configuration + * @return archived path if exists, null - otherwise + * @throws IOException exception + */ + public static Path findArchivedLog(Path path, Configuration conf) throws IOException { + // If the path contains oldWALs keyword then exit early. + if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) { + return null; + } + Path walRootDir = CommonFSUtils.getWALRootDir(conf); + FileSystem fs = path.getFileSystem(conf); + // Try finding the log in old dir + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + ServerName serverName = getServerNameFromWALDirectoryName(path); + // Try finding the log in separate old log dir + oldLogDir = + new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return null; + } + /** * Opens WAL reader with retries and additional exception handling * @param path path to WAL file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index ad77c9d90e6..b07b5b42dc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -52,12 +52,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -716,4 +718,48 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); } } + + /** + * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. + * @throws Exception exception + */ + @Test + public void testEOFExceptionInOldWALsDirectory() throws Exception { + assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); + AbstractFSWAL abstractWAL = (AbstractFSWAL)log; + Path emptyLogFile = abstractWAL.getCurrentFileName(); + log.rollWriter(true); + + // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. + // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to + // oldWALs directory. + Waiter.waitFor(CONF, 5000, + (Waiter.Predicate) () -> abstractWAL.getInflightWALCloseCount() == 0); + // There will 2 logs in the queue. + assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); + + // Get the archived dir path for the first wal. + Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); + // Make sure that the wal path is not the same as archived Dir path. + assertNotNull(archivePath); + assertTrue(fs.exists(archivePath)); + fs.truncate(archivePath, 0); + // make sure the size of the wal file is 0. + assertEquals(0, fs.getFileStatus(archivePath).getLen()); + + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.isPeerEnabled()).thenReturn(true); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + + Configuration localConf = new Configuration(CONF); + localConf.setInt("replication.source.maxretriesmultiplier", 1); + localConf.setBoolean("replication.source.eof.autorecovery", true); + // Start the reader thread. + createReader(false, localConf); + // Wait for the replication queue size to be 1. This means that we have handled + // 0 length wal from oldWALs directory. + Waiter.waitFor(localConf, 10000, + (Waiter.Predicate) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); + } }