HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f0226921ed
commit
ec747bcb29
|
@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
|
Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
|
||||||
if (logFile != archivedLog) {
|
// archivedLog can be null if unable to locate in archiveDir.
|
||||||
|
if (archivedLog != null) {
|
||||||
openReader(archivedLog);
|
openReader(archivedLog);
|
||||||
// Try call again in recursion
|
// Try call again in recursion
|
||||||
return nextKeyValue();
|
return nextKeyValue();
|
||||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
@ -92,6 +95,11 @@ public class TestWALRecordReader {
|
||||||
return "TestWALRecordReader";
|
return "TestWALRecordReader";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getServerName() {
|
||||||
|
ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
|
||||||
|
return serverName.toString();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
fs.delete(hbaseDir, true);
|
fs.delete(hbaseDir, true);
|
||||||
|
@ -282,7 +290,6 @@ public class TestWALRecordReader {
|
||||||
LOG.debug("log="+logDir+" file="+ split.getLogFileName());
|
LOG.debug("log="+logDir+" file="+ split.getLogFileName());
|
||||||
|
|
||||||
testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
|
testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
|
protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
|
||||||
|
@ -335,13 +342,16 @@ public class TestWALRecordReader {
|
||||||
// Move log file to archive directory
|
// Move log file to archive directory
|
||||||
// While WAL record reader is open
|
// While WAL record reader is open
|
||||||
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
|
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
|
||||||
|
|
||||||
Path logFile = new Path(split_.getLogFileName());
|
Path logFile = new Path(split_.getLogFileName());
|
||||||
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
|
Path archivedLogDir = getWALArchiveDir(conf);
|
||||||
boolean result = fs.rename(logFile, archivedLog);
|
Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
|
||||||
assertTrue(result);
|
assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
|
||||||
result = fs.exists(archivedLog);
|
|
||||||
assertTrue(result);
|
assertTrue(fs.rename(logFile, archivedLogLocation));
|
||||||
|
assertTrue(fs.exists(archivedLogDir));
|
||||||
|
assertFalse(fs.exists(logFile));
|
||||||
|
// TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
|
||||||
|
// TODO: the archivedLogLocation to read next key value.
|
||||||
assertTrue(reader.nextKeyValue());
|
assertTrue(reader.nextKeyValue());
|
||||||
cell = reader.getCurrentValue().getCells().get(0);
|
cell = reader.getCurrentValue().getCells().get(0);
|
||||||
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
|
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||||
|
@ -353,4 +363,10 @@ public class TestWALRecordReader {
|
||||||
}
|
}
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Path getWALArchiveDir(Configuration conf) throws IOException {
|
||||||
|
Path rootDir = CommonFSUtils.getWALRootDir(conf);
|
||||||
|
String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
|
||||||
|
return new Path(rootDir, archiveDir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
|
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
@ -396,8 +396,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
try {
|
try {
|
||||||
fileSize = fs.getContentSummary(currentPath).getLength();
|
fileSize = fs.getContentSummary(currentPath).getLength();
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
currentPath = getArchivedLogPath(currentPath, conf);
|
Path archivedLogPath = findArchivedLog(currentPath, conf);
|
||||||
fileSize = fs.getContentSummary(currentPath).getLength();
|
// archivedLogPath can be null if unable to locate in archiveDir.
|
||||||
|
if (archivedLogPath == null) {
|
||||||
|
throw new FileNotFoundException("Couldn't find path: " + currentPath);
|
||||||
|
}
|
||||||
|
fileSize = fs.getContentSummary(archivedLogPath).getLength();
|
||||||
}
|
}
|
||||||
return fileSize;
|
return fileSize;
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
if (!fs.exists(path)) {
|
if (!fs.exists(path)) {
|
||||||
// There is a chance that wal has moved to oldWALs directory, so look there also.
|
// There is a chance that wal has moved to oldWALs directory, so look there also.
|
||||||
path = AbstractFSWALProvider.findArchivedLog(path, conf);
|
path = AbstractFSWALProvider.findArchivedLog(path, conf);
|
||||||
// path is null if it couldn't find archive path.
|
// path can be null if unable to locate in archiveDir.
|
||||||
}
|
}
|
||||||
if (path != null && fs.getFileStatus(path).getLen() == 0) {
|
if (path != null && fs.getFileStatus(path).getLen() == 0) {
|
||||||
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
|
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
|
||||||
|
|
|
@ -319,6 +319,7 @@ class WALEntryStream implements Closeable {
|
||||||
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
|
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
|
||||||
// If the log was archived, continue reading from there
|
// If the log was archived, continue reading from there
|
||||||
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
|
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
|
||||||
|
// archivedLog can be null if unable to locate in archiveDir.
|
||||||
if (archivedLog != null) {
|
if (archivedLog != null) {
|
||||||
openReader(archivedLog);
|
openReader(archivedLog);
|
||||||
} else {
|
} else {
|
||||||
|
@ -384,6 +385,7 @@ class WALEntryStream implements Closeable {
|
||||||
} catch (FileNotFoundException fnfe) {
|
} catch (FileNotFoundException fnfe) {
|
||||||
// If the log was archived, continue reading from there
|
// If the log was archived, continue reading from there
|
||||||
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
|
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
|
||||||
|
// archivedLog can be null if unable to locate in archiveDir.
|
||||||
if (archivedLog != null) {
|
if (archivedLog != null) {
|
||||||
openReader(archivedLog);
|
openReader(archivedLog);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -469,36 +468,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
||||||
return p.toString().contains(oldLog);
|
return p.toString().contains(oldLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the archived WAL file path
|
|
||||||
* @param path - active WAL file path
|
|
||||||
* @param conf - configuration
|
|
||||||
* @return archived path if exists, path - otherwise
|
|
||||||
* @throws IOException exception
|
|
||||||
*/
|
|
||||||
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
|
|
||||||
Path rootDir = CommonFSUtils.getWALRootDir(conf);
|
|
||||||
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
|
||||||
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
|
|
||||||
ServerName serverName = getServerNameFromWALDirectoryName(path);
|
|
||||||
if (serverName == null) {
|
|
||||||
LOG.error("Couldn't locate log: " + path);
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
oldLogDir = new Path(oldLogDir, serverName.getServerName());
|
|
||||||
}
|
|
||||||
Path archivedLogLocation = new Path(oldLogDir, path.getName());
|
|
||||||
final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
|
|
||||||
|
|
||||||
if (fs.exists(archivedLogLocation)) {
|
|
||||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
|
||||||
return archivedLogLocation;
|
|
||||||
} else {
|
|
||||||
LOG.error("Couldn't locate log: " + path);
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the archived WAL file path if it is not able to locate in WALs dir.
|
* Find the archived WAL file path if it is not able to locate in WALs dir.
|
||||||
* @param path - active WAL file path
|
* @param path - active WAL file path
|
||||||
|
@ -531,7 +500,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
||||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||||
return archivedLogLocation;
|
return archivedLogLocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.error("Couldn't locate log: " + path);
|
LOG.error("Couldn't locate log: " + path);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -557,8 +525,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
||||||
return reader;
|
return reader;
|
||||||
} catch (FileNotFoundException fnfe) {
|
} catch (FileNotFoundException fnfe) {
|
||||||
// If the log was archived, continue reading from there
|
// If the log was archived, continue reading from there
|
||||||
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
|
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
|
||||||
if (!Objects.equals(path, archivedLog)) {
|
// archivedLog can be null if unable to locate in archiveDir.
|
||||||
|
if (archivedLog != null) {
|
||||||
return openReader(archivedLog, conf);
|
return openReader(archivedLog, conf);
|
||||||
} else {
|
} else {
|
||||||
throw fnfe;
|
throw fnfe;
|
||||||
|
|
Loading…
Reference in New Issue