HBASE-21688 Address WAL filesystem issues

Amending-Author: Josh Elser <elserj@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Vladimir Rodionov 2019-02-20 15:15:27 -08:00 committed by Josh Elser
parent dd19173fbd
commit aea9277ea6
13 changed files with 68 additions and 50 deletions

View File

@ -383,6 +383,7 @@ public abstract class CommonFSUtils {
* @throws IOException e * @throws IOException e
*/ */
public static Path getWALRootDir(final Configuration c) throws IOException { public static Path getWALRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
if (!isValidWALRootDir(p, c)) { if (!isValidWALRootDir(p, c)) {
return getRootDir(c); return getRootDir(c);

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
@ -944,10 +945,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (keys.isEmpty()) throw new RuntimeException("No keys to find"); if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size()); LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout. // Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); Path walsDir = new Path(
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
" against " + getConf().get(HConstants.HBASE_DIR)); " against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -506,10 +507,9 @@ public void cleanUpCluster() throws Exception {
if (keys.isEmpty()) throw new RuntimeException("No keys to find"); if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size()); LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout. // Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir + LOG.info("Running Search with keys inputDir=" + inputDir +
" against " + getConf().get(HConstants.HBASE_DIR)); " against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});

View File

@ -24,7 +24,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
/** /**
* WALLink describes a link to a WAL. * WALLink describes a link to a WAL.
@ -45,7 +45,7 @@ public class WALLink extends FileLink {
*/ */
public WALLink(final Configuration conf, public WALLink(final Configuration conf,
final String serverName, final String logName) throws IOException { final String serverName, final String logName) throws IOException {
this(FSUtils.getWALRootDir(conf), serverName, logName); this(CommonFSUtils.getWALRootDir(conf), serverName, logName);
} }
/** /**

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -93,15 +94,14 @@ public class MasterWalManager {
private volatile boolean fsOk = true; private volatile boolean fsOk = true;
public MasterWalManager(MasterServices services) throws IOException { public MasterWalManager(MasterServices services) throws IOException {
this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(), this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(), services);
services.getMasterFileSystem().getWALRootDir(), services);
} }
public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterServices services) public MasterWalManager(Configuration conf, FileSystem fs, MasterServices services)
throws IOException { throws IOException {
this.fs = fs; this.fs = fs;
this.conf = conf; this.conf = conf;
this.rootDir = rootDir; this.rootDir = CommonFSUtils.getWALRootDir(conf);
this.services = services; this.services = services;
this.splitLogManager = new SplitLogManager(services, conf); this.splitLogManager = new SplitLogManager(services, conf);
@ -190,9 +190,10 @@ public class MasterWalManager {
/** /**
* @return Returns the WALs dir under <code>rootDir</code> * @return Returns the WALs dir under <code>rootDir</code>
* @throws IOException
*/ */
Path getWALDirPath() { Path getWALDirPath() throws IOException {
return new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME); return new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
} }
/** /**
@ -213,7 +214,7 @@ public class MasterWalManager {
* it. * it.
*/ */
@Deprecated @Deprecated
public Set<ServerName> getFailedServersFromLogFolders() { public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);

View File

@ -124,7 +124,7 @@ class ReplicationSourceWALReader extends Thread {
int sleepMultiplier = 1; int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) { source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can while (isReaderRunning()) { // loop here to keep reusing stream while we can

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -73,18 +74,17 @@ class WALEntryStream implements Closeable {
/** /**
* Create an entry stream over the given queue at the given start position * Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths * @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream * @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at * @param startPosition the position in the first WAL to start reading at
* @param serverName the server name which all WALs belong to * @param serverName the server name which all WALs belong to
* @param metrics replication metrics * @param metrics replication metrics
* @throws IOException * @throws IOException
*/ */
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf, public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics) throws IOException { MetricsSource metrics) throws IOException {
this.logQueue = logQueue; this.logQueue = logQueue;
this.fs = fs; this.fs = CommonFSUtils.getWALFileSystem(conf);
this.conf = conf; this.conf = conf;
this.currentPositionOfEntry = startPosition; this.currentPositionOfEntry = startPosition;
this.walFileLengthProvider = walFileLengthProvider; this.walFileLengthProvider = walFileLengthProvider;
@ -311,10 +311,10 @@ class WALEntryStream implements Closeable {
} }
private Path getArchivedLog(Path path) throws IOException { private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf); Path walRootDir = CommonFSUtils.getWALRootDir(conf);
// Try found the log in old dir // Try found the log in old dir
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName()); Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) { if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation); LOG.info("Log " + path + " was moved to " + archivedLogLocation);
@ -323,7 +323,7 @@ class WALEntryStream implements Closeable {
// Try found the log in the seperate old log dir // Try found the log in the seperate old log dir
oldLogDir = oldLogDir =
new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString()); .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName()); archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) { if (fs.exists(archivedLogLocation)) {
@ -380,7 +380,8 @@ class WALEntryStream implements Closeable {
// For HBASE-15019 // For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) { private void recoverLease(final Configuration conf, final Path path) {
try { try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf); FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override @Override

View File

@ -1681,9 +1681,9 @@ public class HBaseFsck extends Configured implements Closeable {
* Meta recovery WAL directory inside WAL directory path. * Meta recovery WAL directory inside WAL directory path.
*/ */
private void removeHBCKMetaRecoveryWALDir(String walFactoryId) throws IOException { private void removeHBCKMetaRecoveryWALDir(String walFactoryId) throws IOException {
Path rootdir = FSUtils.getRootDir(getConf()); Path walLogDir = new Path(new Path(CommonFSUtils.getWALRootDir(getConf()),
Path walLogDir = new Path(new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), walFactoryId); HConstants.HREGION_LOGDIR_NAME), walFactoryId);
FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); FileSystem fs = CommonFSUtils.getWALFileSystem(getConf());
FileStatus[] walFiles = FSUtils.listStatus(fs, walLogDir, null); FileStatus[] walFiles = FSUtils.listStatus(fs, walLogDir, null);
if (walFiles == null || walFiles.length == 0) { if (walFiles == null || walFiles.length == 0) {
LOG.info("HBCK meta recovery WAL directory is empty, removing it now."); LOG.info("HBCK meta recovery WAL directory is empty, removing it now.");

View File

@ -418,8 +418,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @throws IOException exception * @throws IOException exception
*/ */
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = FSUtils.getRootDir(conf); Path walRootDir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
ServerName serverName = getServerNameFromWALDirectoryName(path); ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) { if (serverName == null) {
@ -429,7 +429,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
oldLogDir = new Path(oldLogDir, serverName.getServerName()); oldLogDir = new Path(oldLogDir, serverName.getServerName());
} }
Path archivedLogLocation = new Path(oldLogDir, path.getName()); Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = FSUtils.getCurrentFileSystem(conf); final FileSystem fs = FSUtils.getWALFileSystem(conf);
if (fs.exists(archivedLogLocation)) { if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation); LOG.info("Log " + path + " was moved to " + archivedLogLocation);

View File

@ -141,7 +141,7 @@ public class TestBlockReorderMultiBlocks {
// Now we need to find the log file, its locations, and look at it // Now we need to find the log file, its locations, and look at it
String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + String rootDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
"/" + targetRs.getServerName().toString()).toUri().getPath(); "/" + targetRs.getServerName().toString()).toUri().getPath();
DistributedFileSystem mdfs = (DistributedFileSystem) DistributedFileSystem mdfs = (DistributedFileSystem)

View File

@ -416,7 +416,7 @@ public abstract class AbstractTestDLS {
startCluster(1); startCluster(1);
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
final FileSystem fs = master.getMasterFileSystem().getFileSystem(); final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME), final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
ServerName.valueOf("x", 1, 1).toString()); ServerName.valueOf("x", 1, 1).toString());
fs.mkdirs(logDir); fs.mkdirs(logDir);
ExecutorService executor = null; ExecutorService executor = null;

View File

@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
@ -60,14 +60,27 @@ public class TestMasterWALManager {
public void before() throws IOException { public void before() throws IOException {
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class); MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
Mockito.when(mfs.getWALFileSystem()).thenReturn(HTU.getTestFileSystem()); Mockito.when(mfs.getWALFileSystem()).thenReturn(HTU.getTestFileSystem());
Path walRootDir = HTU.createWALRootDir(); final Path walRootDir = HTU.getDataTestDir();;
Mockito.when(mfs.getWALRootDir()).thenReturn(walRootDir); Mockito.when(mfs.getWALRootDir()).thenReturn(walRootDir);
this.masterServices = Mockito.mock(MasterServices.class); this.masterServices = Mockito.mock(MasterServices.class);
Mockito.when(this.masterServices.getConfiguration()).thenReturn(HTU.getConfiguration()); Mockito.when(this.masterServices.getConfiguration()).thenReturn(HTU.getConfiguration());
Mockito.when(this.masterServices.getMasterFileSystem()).thenReturn(mfs); Mockito.when(this.masterServices.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(this.masterServices.getServerName()). Mockito.when(this.masterServices.getServerName())
thenReturn(ServerName.parseServerName("master.example.org,0123,456")); .thenReturn(ServerName.parseServerName("master.example.org,0123,456"));
this.mwm = new MasterWalManager(this.masterServices); this.mwm = new MasterWalManager(this.masterServices) {
@Override
Path getWALDirPath() throws IOException {
return walRootDir;
}
@Override
Path getWALDirectoryName(ServerName serverName) {
return new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
}
};
} }
@Test @Test

View File

@ -156,7 +156,7 @@ public class TestWALEntryStream {
log.rollWriter(); log.rollWriter();
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
int i = 0; int i = 0;
while (entryStream.hasNext()) { while (entryStream.hasNext()) {
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());
@ -183,7 +183,7 @@ public class TestWALEntryStream {
appendToLogAndSync(); appendToLogAndSync();
long oldPos; long oldPos;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception // There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext()); assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek(); WAL.Entry entry = entryStream.peek();
@ -197,7 +197,7 @@ public class TestWALEntryStream {
appendToLogAndSync(); appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
log, null, new MetricsSource("1"))) { log, null, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress // Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.next();
@ -211,7 +211,7 @@ public class TestWALEntryStream {
log.rollWriter(); log.rollWriter();
appendToLogAndSync(); appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
log, null, new MetricsSource("1"))) { log, null, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition()); assertNotEquals(oldPos, entryStream.getPosition());
@ -237,7 +237,7 @@ public class TestWALEntryStream {
appendToLog("1"); appendToLog("1");
appendToLog("2");// 2 appendToLog("2");// 2
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next())); assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened appendToLog("3"); // 3 - comes in after reader opened
@ -262,7 +262,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception { public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1"); appendToLog("1");
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming // some new entries come in while we're streaming
@ -285,7 +285,7 @@ public class TestWALEntryStream {
long lastPosition = 0; long lastPosition = 0;
appendToLog("1"); appendToLog("1");
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2"); appendToLog("2");
appendToLog("3"); appendToLog("3");
@ -293,7 +293,7 @@ public class TestWALEntryStream {
} }
// next stream should picks up where we left off // next stream should picks up where we left off
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next())); assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done assertFalse(entryStream.hasNext()); // done
@ -310,14 +310,14 @@ public class TestWALEntryStream {
long lastPosition = 0; long lastPosition = 0;
appendEntriesToLogAndSync(3); appendEntriesToLogAndSync(3);
// read only one element // read only one element
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
log, null, new MetricsSource("1"))) { log, null, new MetricsSource("1"))) {
entryStream.next(); entryStream.next();
lastPosition = entryStream.getPosition(); lastPosition = entryStream.getPosition();
} }
// there should still be two more entries from where we left off // there should still be two more entries from where we left off
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext()); assertFalse(entryStream.hasNext());
@ -328,7 +328,7 @@ public class TestWALEntryStream {
@Test @Test
public void testEmptyStream() throws Exception { public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext()); assertFalse(entryStream.hasNext());
} }
} }
@ -361,7 +361,7 @@ public class TestWALEntryStream {
// get ending position // get ending position
long position; long position;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
@ -478,7 +478,7 @@ public class TestWALEntryStream {
// get ending position // get ending position
long position; long position;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
@ -594,7 +594,7 @@ public class TestWALEntryStream {
appendToLog("2"); appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1); AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0,
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
assertTrue(entryStream.hasNext()); assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());