diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index b410fc22d89..ffc202ab0dc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat { } return res; } catch (IOException e) { - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); - if (logFile != archivedLog) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf); + // archivedLog can be null if unable to locate in archiveDir. + if (archivedLog != null) { openReader(archivedLog); // Try call again in recursion return nextKeyValue(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 0fa73f6e832..812addafa47 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -91,6 +94,11 @@ public class TestWALRecordReader { return "TestWALRecordReader"; } + private static String getServerName() { + ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1); + return serverName.toString(); + } + @Before public void setUp() throws Exception { fs.delete(hbaseDir, true); @@ -269,4 +277,53 @@ public class TestWALRecordReader { assertFalse(reader.nextKeyValue()); reader.close(); } + + /** + * Create a new reader from the split, match the edits against the passed columns, + * moving WAL to archive in between readings + */ + private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { + WALRecordReader reader = getReader(); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + + assertTrue(reader.nextKeyValue()); + Cell cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + // Move log file to archive directory + // While WAL record reader is open + WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; + Path logFile = new Path(split_.getLogFileName()); + Path archivedLogDir = getWALArchiveDir(conf); + Path archivedLogLocation = new Path(archivedLogDir, logFile.getName()); + assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString()); + + assertTrue(fs.rename(logFile, archivedLogLocation)); + assertTrue(fs.exists(archivedLogDir)); + assertFalse(fs.exists(logFile)); + // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open + // TODO: the archivedLogLocation to read next key value. + assertTrue(reader.nextKeyValue()); + cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())) { + assertTrue( + "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", + false); + } + reader.close(); + } + + private Path getWALArchiveDir(Configuration conf) throws IOException { + Path rootDir = CommonFSUtils.getWALRootDir(conf); + String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName()); + return new Path(rootDir, archiveDir); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 9f8b3364945..92f1979ecae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -400,8 +400,12 @@ public class ReplicationSource implements ReplicationSourceInterface { try { fileSize = fs.getContentSummary(currentPath).getLength(); } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); + Path archivedLogPath = findArchivedLog(currentPath, conf); + // archivedLogPath can be null if unable to locate in archiveDir. + if (archivedLogPath == null) { + throw new FileNotFoundException("Couldn't find path: " + currentPath); + } + fileSize = fs.getContentSummary(archivedLogPath).getLength(); } return fileSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 9af91e5a59f..11090448c7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread { if (!fs.exists(path)) { // There is a chance that wal has moved to oldWALs directory, so look there also. path = AbstractFSWALProvider.findArchivedLog(path, conf); - // path is null if it couldn't find archive path. + // path can be null if unable to locate in archiveDir. } if (path != null && fs.getFileStatus(path).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: {}", path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 956024b0482..441bc1beaee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -318,6 +318,7 @@ class WALEntryStream implements Closeable { private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + // archivedLog can be null if unable to locate in archiveDir. if (archivedLog != null) { openReader(archivedLog); } else { @@ -383,6 +384,7 @@ class WALEntryStream implements Closeable { } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); + // archivedLog can be null if unable to locate in archiveDir. if (archivedLog != null) { openReader(archivedLog); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 5fbeca38187..989210b8de2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -448,36 +447,6 @@ public abstract class AbstractFSWALProvider> implemen return p.toString().contains(oldLog); } - /** - * Get the archived WAL file path - * @param path - active WAL file path - * @param conf - configuration - * @return archived path if exists, path - otherwise - * @throws IOException exception - */ - public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { - Path rootDir = CommonFSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { - ServerName serverName = getServerNameFromWALDirectoryName(path); - if (serverName == null) { - LOG.error("Couldn't locate log: " + path); - return path; - } - oldLogDir = new Path(oldLogDir, serverName.getServerName()); - } - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - final FileSystem fs = CommonFSUtils.getWALFileSystem(conf); - - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } else { - LOG.error("Couldn't locate log: " + path); - return path; - } - } - /** * Find the archived WAL file path if it is not able to locate in WALs dir. * @param path - active WAL file path @@ -510,7 +479,6 @@ public abstract class AbstractFSWALProvider> implemen LOG.info("Log " + path + " was moved to " + archivedLogLocation); return archivedLogLocation; } - LOG.error("Couldn't locate log: " + path); return null; } @@ -536,8 +504,9 @@ public abstract class AbstractFSWALProvider> implemen return reader; } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf); - if (!Objects.equals(path, archivedLog)) { + Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); + // archivedLog can be null if unable to locate in archiveDir. + if (archivedLog != null) { return openReader(archivedLog, conf); } else { throw fnfe;