diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 14bfec72efe..ae9dcb8c8ef 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat { } return res; } catch (IOException e) { - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); - if (logFile != archivedLog) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf); + // archivedLog can be null if unable to locate in archiveDir. + if (archivedLog != null) { openReader(archivedLog); // Try call again in recursion return nextKeyValue(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 5e1c097b164..c8ff9042932 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -92,6 +95,11 @@ public class TestWALRecordReader { return "TestWALRecordReader"; } + private static String getServerName() { + ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1); + return serverName.toString(); + } + @Before public void setUp() throws Exception { fs.delete(hbaseDir, true); @@ -282,7 +290,6 @@ public class TestWALRecordReader { LOG.debug("log="+logDir+" file="+ split.getLogFileName()); testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); - } protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap scopes) { @@ -335,13 +342,16 @@ public class TestWALRecordReader { // Move log file to archive directory // While WAL record reader is open WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; - Path logFile = new Path(split_.getLogFileName()); - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); - boolean result = fs.rename(logFile, archivedLog); - assertTrue(result); - result = fs.exists(archivedLog); - assertTrue(result); + Path archivedLogDir = getWALArchiveDir(conf); + Path archivedLogLocation = new Path(archivedLogDir, logFile.getName()); + assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString()); + + assertTrue(fs.rename(logFile, archivedLogLocation)); + assertTrue(fs.exists(archivedLogDir)); + assertFalse(fs.exists(logFile)); + // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open + // TODO: the archivedLogLocation to read next key value. assertTrue(reader.nextKeyValue()); cell = reader.getCurrentValue().getCells().get(0); if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), @@ -353,4 +363,10 @@ public class TestWALRecordReader { } reader.close(); } + + private Path getWALArchiveDir(Configuration conf) throws IOException { + Path rootDir = CommonFSUtils.getWALRootDir(conf); + String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName()); + return new Path(rootDir, archiveDir); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d1268fab94c..11222a5dd24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -396,8 +396,12 @@ public class ReplicationSource implements ReplicationSourceInterface { try { fileSize = fs.getContentSummary(currentPath).getLength(); } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); + Path archivedLogPath = findArchivedLog(currentPath, conf); + // archivedLogPath can be null if unable to locate in archiveDir. + if (archivedLogPath == null) { + throw new FileNotFoundException("Couldn't find path: " + currentPath); + } + fileSize = fs.getContentSummary(archivedLogPath).getLength(); } return fileSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 9af91e5a59f..11090448c7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread { if (!fs.exists(path)) { // There is a chance that wal has moved to oldWALs directory, so look there also. path = AbstractFSWALProvider.findArchivedLog(path, conf); - // path is null if it couldn't find archive path. + // path can be null if unable to locate in archiveDir. } if (path != null && fs.getFileStatus(path).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: {}", path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index f04819d4f82..488355c3a2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -319,6 +319,7 @@ class WALEntryStream implements Closeable { private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + // archivedLog can be null if unable to locate in archiveDir. if (archivedLog != null) { openReader(archivedLog); } else { @@ -384,6 +385,7 @@ class WALEntryStream implements Closeable { } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); + // archivedLog can be null if unable to locate in archiveDir. if (archivedLog != null) { openReader(archivedLog); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ffe5c422a5b..75605e604c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -469,36 +468,6 @@ public abstract class AbstractFSWALProvider> implemen return p.toString().contains(oldLog); } - /** - * Get the archived WAL file path - * @param path - active WAL file path - * @param conf - configuration - * @return archived path if exists, path - otherwise - * @throws IOException exception - */ - public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { - Path rootDir = CommonFSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { - ServerName serverName = getServerNameFromWALDirectoryName(path); - if (serverName == null) { - LOG.error("Couldn't locate log: " + path); - return path; - } - oldLogDir = new Path(oldLogDir, serverName.getServerName()); - } - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - final FileSystem fs = CommonFSUtils.getWALFileSystem(conf); - - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } else { - LOG.error("Couldn't locate log: " + path); - return path; - } - } - /** * Find the archived WAL file path if it is not able to locate in WALs dir. * @param path - active WAL file path @@ -531,7 +500,6 @@ public abstract class AbstractFSWALProvider> implemen LOG.info("Log " + path + " was moved to " + archivedLogLocation); return archivedLogLocation; } - LOG.error("Couldn't locate log: " + path); return null; } @@ -557,8 +525,9 @@ public abstract class AbstractFSWALProvider> implemen return reader; } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf); - if (!Objects.equals(path, archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + // archivedLog can be null if unable to locate in archiveDir. + if (archivedLog != null) { return openReader(archivedLog, conf); } else { throw fnfe;