This bug was exposed by the test from HBASE-25924. Since this wal
implementations close the wal asynchronously, replication can potentially
miss the trailer bytes. (see jira comment for detailed analysis).
While this is not a correctness problem (since trailer does not have any entry data),
it erroneously bumps a metric that is used to track skipped bytes in WAL resulting
in false alarms which is something we should avoid.
Reviewed-by: Rushabh Shah <rushabh.shah@salesforce.com>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
(cherry picked from commit b04c3c7786
)
This commit is contained in:
parent
1972261826
commit
8a41cb7f1d
|
@ -37,6 +37,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
@ -188,6 +189,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
/** Listeners that are called on WAL events. */
|
/** Listeners that are called on WAL events. */
|
||||||
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
|
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
/** Tracks the logs in the process of being closed. */
|
||||||
|
protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
|
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
|
||||||
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
|
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
|
||||||
|
@ -947,6 +951,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
LOG.info("Closed WAL: " + toString());
|
LOG.info("Closed WAL: " + toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of WALs currently in the process of closing.
|
||||||
|
*/
|
||||||
|
public int getInflightWALCloseCount() {
|
||||||
|
return inflightWALClosures.size();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
||||||
* number if the given seq id is bigger, or even if it is lower than existing one
|
* number if the given seq id is bigger, or even if it is lower than existing one
|
||||||
|
@ -1112,9 +1123,18 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
try {
|
try {
|
||||||
Path currentPath = getOldPath();
|
Path currentPath = getOldPath();
|
||||||
if (path.equals(currentPath)) {
|
if (path.equals(currentPath)) {
|
||||||
|
// Currently active path.
|
||||||
W writer = this.writer;
|
W writer = this.writer;
|
||||||
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
|
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
|
||||||
} else {
|
} else {
|
||||||
|
W temp = inflightWALClosures.get(path.getName());
|
||||||
|
if (temp != null) {
|
||||||
|
// In the process of being closed, trailer bytes may or may not be flushed.
|
||||||
|
// Ensuring that we read all the bytes in a file is critical for correctness of tailing
|
||||||
|
// use cases like replication, see HBASE-25924/HBASE-25932.
|
||||||
|
return OptionalLong.of(temp.getSyncedLength());
|
||||||
|
}
|
||||||
|
// Log rolled successfully.
|
||||||
return OptionalLong.empty();
|
return OptionalLong.empty();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Comparator;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.OptionalLong;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
@ -682,14 +683,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final long closeWriter(AsyncWriter writer) {
|
protected final long closeWriter(AsyncWriter writer, Path path) {
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
|
inflightWALClosures.put(path.getName(), writer);
|
||||||
long fileLength = writer.getLength();
|
long fileLength = writer.getLength();
|
||||||
closeExecutor.execute(() -> {
|
closeExecutor.execute(() -> {
|
||||||
try {
|
try {
|
||||||
writer.close();
|
writer.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("close old writer failed", e);
|
LOG.warn("close old writer failed", e);
|
||||||
|
} finally {
|
||||||
|
inflightWALClosures.remove(path.getName());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return fileLength;
|
return fileLength;
|
||||||
|
@ -703,7 +707,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkNotNull(nextWriter);
|
Preconditions.checkNotNull(nextWriter);
|
||||||
waitForSafePoint();
|
waitForSafePoint();
|
||||||
long oldFileLen = closeWriter(this.writer);
|
long oldFileLen = closeWriter(this.writer, oldPath);
|
||||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||||
this.writer = nextWriter;
|
this.writer = nextWriter;
|
||||||
if (nextWriter instanceof AsyncProtobufLogWriter) {
|
if (nextWriter instanceof AsyncProtobufLogWriter) {
|
||||||
|
@ -729,7 +733,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
@Override
|
@Override
|
||||||
protected void doShutdown() throws IOException {
|
protected void doShutdown() throws IOException {
|
||||||
waitForSafePoint();
|
waitForSafePoint();
|
||||||
closeWriter(this.writer);
|
closeWriter(this.writer, getOldPath());
|
||||||
this.writer = null;
|
this.writer = null;
|
||||||
closeExecutor.shutdown();
|
closeExecutor.shutdown();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default implementation of FSWAL.
|
* The original implementation of FSWAL.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FSHLog extends AbstractFSWAL<Writer> {
|
public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
|
@ -382,6 +382,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
// In case of having unflushed entries or we already reached the
|
// In case of having unflushed entries or we already reached the
|
||||||
// closeErrorsTolerated count, call the closeWriter inline rather than in async
|
// closeErrorsTolerated count, call the closeWriter inline rather than in async
|
||||||
// way so that in case of an IOE we will throw it back and abort RS.
|
// way so that in case of an IOE we will throw it back and abort RS.
|
||||||
|
inflightWALClosures.put(oldPath.getName(), writer);
|
||||||
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
|
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
|
||||||
closeWriter(this.writer, oldPath, true);
|
closeWriter(this.writer, oldPath, true);
|
||||||
} else {
|
} else {
|
||||||
|
@ -449,6 +450,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
}
|
}
|
||||||
LOG.warn("Riding over failed WAL close of " + path
|
LOG.warn("Riding over failed WAL close of " + path
|
||||||
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
|
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
|
||||||
|
} finally {
|
||||||
|
inflightWALClosures.remove(path.getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -223,10 +223,9 @@ class WALEntryStream implements Closeable {
|
||||||
if (trailerSize < 0) {
|
if (trailerSize < 0) {
|
||||||
if (currentPositionOfReader < stat.getLen()) {
|
if (currentPositionOfReader < stat.getLen()) {
|
||||||
final long skippedBytes = stat.getLen() - currentPositionOfReader;
|
final long skippedBytes = stat.getLen() - currentPositionOfReader;
|
||||||
LOG.debug(
|
// See the commits in HBASE-25924/HBASE-25932 for context.
|
||||||
"Reached the end of WAL {}. It was not closed cleanly," +
|
LOG.warn("Reached the end of WAL {}. It was not closed cleanly," +
|
||||||
" so we did not parse {} bytes of data. This is normally ok.",
|
" so we did not parse {} bytes of data.", currentPath, skippedBytes);
|
||||||
currentPath, skippedBytes);
|
|
||||||
metrics.incrUncleanlyClosedWALs();
|
metrics.incrUncleanlyClosedWALs();
|
||||||
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
|
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
|
||||||
|
*/
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestFSHLogWALEntryStream extends TestWALEntryStream {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
CONF = TEST_UTIL.getConfiguration();
|
||||||
|
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
|
||||||
|
CONF.setLong("replication.source.sleepforretries", 10);
|
||||||
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
cluster = TEST_UTIL.getDFSCluster();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,11 +53,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||||
|
@ -91,10 +94,11 @@ public class TestWALEntryStream {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestWALEntryStream.class);
|
HBaseClassTestRule.forClass(TestWALEntryStream.class);
|
||||||
|
|
||||||
private static HBaseTestingUtility TEST_UTIL;
|
private static final long TEST_TIMEOUT_MS = 5000;
|
||||||
private static Configuration CONF;
|
protected static HBaseTestingUtility TEST_UTIL;
|
||||||
private static FileSystem fs;
|
protected static Configuration CONF;
|
||||||
private static MiniDFSCluster cluster;
|
protected static FileSystem fs;
|
||||||
|
protected static MiniDFSCluster cluster;
|
||||||
private static final TableName tableName = TableName.valueOf("tablename");
|
private static final TableName tableName = TableName.valueOf("tablename");
|
||||||
private static final byte[] family = Bytes.toBytes("column");
|
private static final byte[] family = Bytes.toBytes("column");
|
||||||
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
||||||
|
@ -103,6 +107,34 @@ public class TestWALEntryStream {
|
||||||
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
||||||
private final String fakeWalGroupId = "fake-wal-group-id";
|
private final String fakeWalGroupId = "fake-wal-group-id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper that waits until a non-null entry is available in the stream next or times out.
|
||||||
|
* A {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
|
||||||
|
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
|
||||||
|
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
|
||||||
|
* consumers. One such cause of delay is the close() of writer writing these log files. While the
|
||||||
|
* closure is in progress, the stream does not switch to the next log in the queue and next() may
|
||||||
|
* return null entries. This utility wraps these retries into a single next call and that makes
|
||||||
|
* the test code simpler.
|
||||||
|
*/
|
||||||
|
private static class WALEntryStreamWithRetries extends WALEntryStream {
|
||||||
|
// Class member to be able to set a non-final from within a lambda.
|
||||||
|
private Entry result;
|
||||||
|
|
||||||
|
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
|
||||||
|
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
||||||
|
MetricsSource metrics, String walGroupId) throws IOException {
|
||||||
|
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Entry next() {
|
||||||
|
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
|
||||||
|
result = WALEntryStreamWithRetries.super.next()) != null);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static NavigableMap<byte[], Integer> getScopes() {
|
private static NavigableMap<byte[], Integer> getScopes() {
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
scopes.put(family, 1);
|
scopes.put(family, 1);
|
||||||
|
@ -148,7 +180,9 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
log.close();
|
if (log != null) {
|
||||||
|
log.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try out different combinations of row count and KeyValue count
|
// Try out different combinations of row count and KeyValue count
|
||||||
|
@ -215,7 +249,7 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
|
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
||||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
// Read the newly added entry, make sure we made progress
|
// Read the newly added entry, make sure we made progress
|
||||||
WAL.Entry entry = entryStream.next();
|
WAL.Entry entry = entryStream.next();
|
||||||
|
@ -229,7 +263,7 @@ public class TestWALEntryStream {
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
|
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
||||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
WAL.Entry entry = entryStream.next();
|
WAL.Entry entry = entryStream.next();
|
||||||
assertNotEquals(oldPos, entryStream.getPosition());
|
assertNotEquals(oldPos, entryStream.getPosition());
|
||||||
|
@ -255,7 +289,7 @@ public class TestWALEntryStream {
|
||||||
appendToLog("1");
|
appendToLog("1");
|
||||||
appendToLog("2");// 2
|
appendToLog("2");// 2
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
assertEquals("1", getRow(entryStream.next()));
|
assertEquals("1", getRow(entryStream.next()));
|
||||||
|
|
||||||
|
@ -530,7 +564,8 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return fs.getFileStatus(walPath).getLen() > 0;
|
return fs.getFileStatus(walPath).getLen() > 0 &&
|
||||||
|
((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -539,12 +574,13 @@ public class TestWALEntryStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
long walLength = fs.getFileStatus(walPath).getLen();
|
|
||||||
|
|
||||||
ReplicationSourceWALReader reader = createReader(false, CONF);
|
ReplicationSourceWALReader reader = createReader(false, CONF);
|
||||||
|
|
||||||
WALEntryBatch entryBatch = reader.take();
|
WALEntryBatch entryBatch = reader.take();
|
||||||
assertEquals(walPath, entryBatch.getLastWalPath());
|
assertEquals(walPath, entryBatch.getLastWalPath());
|
||||||
|
|
||||||
|
long walLength = fs.getFileStatus(walPath).getLen();
|
||||||
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
|
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
|
||||||
walLength, entryBatch.getLastWalPosition() <= walLength);
|
walLength, entryBatch.getLastWalPosition() <= walLength);
|
||||||
assertEquals(1, entryBatch.getNbEntries());
|
assertEquals(1, entryBatch.getNbEntries());
|
||||||
|
@ -869,7 +905,7 @@ public class TestWALEntryStream {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCleanClosedWALs() throws Exception {
|
public void testCleanClosedWALs() throws Exception {
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
|
||||||
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
||||||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
|
|
Loading…
Reference in New Issue