This bug was exposed by the test from HBASE-25924. Since this wal
implementations close the wal asynchronously, replication can potentially
miss the trailer bytes. (see jira comment for detailed analysis).
While this is not a correctness problem (since trailer does not have any entry data),
it erroneously bumps a metric that is used to track skipped bytes in WAL resulting
in false alarms which is something we should avoid.
Reviewed-by: Rushabh Shah <rushabh.shah@salesforce.com>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
(cherry picked from commit b04c3c7786
)
This commit is contained in:
parent
0ede56d24a
commit
d4285be5c1
|
@ -37,6 +37,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -188,6 +189,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
/** Listeners that are called on WAL events. */
|
||||
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
/** Tracks the logs in the process of being closed. */
|
||||
protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
|
||||
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
|
||||
|
@ -947,6 +951,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
LOG.info("Closed WAL: " + toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of WALs currently in the process of closing.
|
||||
*/
|
||||
public int getInflightWALCloseCount() {
|
||||
return inflightWALClosures.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
||||
* number if the given seq id is bigger, or even if it is lower than existing one
|
||||
|
@ -1112,9 +1123,18 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
try {
|
||||
Path currentPath = getOldPath();
|
||||
if (path.equals(currentPath)) {
|
||||
// Currently active path.
|
||||
W writer = this.writer;
|
||||
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
|
||||
} else {
|
||||
W temp = inflightWALClosures.get(path.getName());
|
||||
if (temp != null) {
|
||||
// In the process of being closed, trailer bytes may or may not be flushed.
|
||||
// Ensuring that we read all the bytes in a file is critical for correctness of tailing
|
||||
// use cases like replication, see HBASE-25924/HBASE-25932.
|
||||
return OptionalLong.of(temp.getSyncedLength());
|
||||
}
|
||||
// Log rolled successfully.
|
||||
return OptionalLong.empty();
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Comparator;
|
|||
import java.util.Deque;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Queue;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
@ -682,14 +683,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
}
|
||||
|
||||
protected final long closeWriter(AsyncWriter writer) {
|
||||
protected final long closeWriter(AsyncWriter writer, Path path) {
|
||||
if (writer != null) {
|
||||
inflightWALClosures.put(path.getName(), writer);
|
||||
long fileLength = writer.getLength();
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("close old writer failed", e);
|
||||
} finally {
|
||||
inflightWALClosures.remove(path.getName());
|
||||
}
|
||||
});
|
||||
return fileLength;
|
||||
|
@ -703,7 +707,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
throws IOException {
|
||||
Preconditions.checkNotNull(nextWriter);
|
||||
waitForSafePoint();
|
||||
long oldFileLen = closeWriter(this.writer);
|
||||
long oldFileLen = closeWriter(this.writer, oldPath);
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
this.writer = nextWriter;
|
||||
if (nextWriter instanceof AsyncProtobufLogWriter) {
|
||||
|
@ -729,7 +733,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
@Override
|
||||
protected void doShutdown() throws IOException {
|
||||
waitForSafePoint();
|
||||
closeWriter(this.writer);
|
||||
closeWriter(this.writer, getOldPath());
|
||||
this.writer = null;
|
||||
closeExecutor.shutdown();
|
||||
try {
|
||||
|
|
|
@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* The default implementation of FSWAL.
|
||||
* The original implementation of FSWAL.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FSHLog extends AbstractFSWAL<Writer> {
|
||||
|
@ -382,6 +382,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// In case of having unflushed entries or we already reached the
|
||||
// closeErrorsTolerated count, call the closeWriter inline rather than in async
|
||||
// way so that in case of an IOE we will throw it back and abort RS.
|
||||
inflightWALClosures.put(oldPath.getName(), writer);
|
||||
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
|
||||
closeWriter(this.writer, oldPath, true);
|
||||
} else {
|
||||
|
@ -449,6 +450,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
LOG.warn("Riding over failed WAL close of " + path
|
||||
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
|
||||
} finally {
|
||||
inflightWALClosures.remove(path.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -223,10 +223,9 @@ class WALEntryStream implements Closeable {
|
|||
if (trailerSize < 0) {
|
||||
if (currentPositionOfReader < stat.getLen()) {
|
||||
final long skippedBytes = stat.getLen() - currentPositionOfReader;
|
||||
LOG.debug(
|
||||
"Reached the end of WAL {}. It was not closed cleanly," +
|
||||
" so we did not parse {} bytes of data. This is normally ok.",
|
||||
currentPath, skippedBytes);
|
||||
// See the commits in HBASE-25924/HBASE-25932 for context.
|
||||
LOG.warn("Reached the end of WAL {}. It was not closed cleanly," +
|
||||
" so we did not parse {} bytes of data.", currentPath, skippedBytes);
|
||||
metrics.incrUncleanlyClosedWALs();
|
||||
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
|
||||
*/
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestFSHLogWALEntryStream extends TestWALEntryStream {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
|
||||
CONF.setLong("replication.source.sleepforretries", 10);
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
cluster = TEST_UTIL.getDFSCluster();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
}
|
|
@ -53,11 +53,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
|
@ -91,10 +94,11 @@ public class TestWALEntryStream {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestWALEntryStream.class);
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
private static Configuration CONF;
|
||||
private static FileSystem fs;
|
||||
private static MiniDFSCluster cluster;
|
||||
private static final long TEST_TIMEOUT_MS = 5000;
|
||||
protected static HBaseTestingUtility TEST_UTIL;
|
||||
protected static Configuration CONF;
|
||||
protected static FileSystem fs;
|
||||
protected static MiniDFSCluster cluster;
|
||||
private static final TableName tableName = TableName.valueOf("tablename");
|
||||
private static final byte[] family = Bytes.toBytes("column");
|
||||
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
||||
|
@ -103,6 +107,34 @@ public class TestWALEntryStream {
|
|||
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
||||
private final String fakeWalGroupId = "fake-wal-group-id";
|
||||
|
||||
/**
|
||||
* Test helper that waits until a non-null entry is available in the stream next or times out.
|
||||
* A {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
|
||||
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
|
||||
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
|
||||
* consumers. One such cause of delay is the close() of writer writing these log files. While the
|
||||
* closure is in progress, the stream does not switch to the next log in the queue and next() may
|
||||
* return null entries. This utility wraps these retries into a single next call and that makes
|
||||
* the test code simpler.
|
||||
*/
|
||||
private static class WALEntryStreamWithRetries extends WALEntryStream {
|
||||
// Class member to be able to set a non-final from within a lambda.
|
||||
private Entry result;
|
||||
|
||||
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
|
||||
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
||||
MetricsSource metrics, String walGroupId) throws IOException {
|
||||
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entry next() {
|
||||
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
|
||||
result = WALEntryStreamWithRetries.super.next()) != null);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private static NavigableMap<byte[], Integer> getScopes() {
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(family, 1);
|
||||
|
@ -148,7 +180,9 @@ public class TestWALEntryStream {
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
log.close();
|
||||
if (log != null) {
|
||||
log.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Try out different combinations of row count and KeyValue count
|
||||
|
@ -215,7 +249,7 @@ public class TestWALEntryStream {
|
|||
|
||||
appendToLogAndSync();
|
||||
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
// Read the newly added entry, make sure we made progress
|
||||
WAL.Entry entry = entryStream.next();
|
||||
|
@ -229,7 +263,7 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
appendToLogAndSync();
|
||||
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
|
@ -255,7 +289,7 @@ public class TestWALEntryStream {
|
|||
appendToLog("1");
|
||||
appendToLog("2");// 2
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertEquals("1", getRow(entryStream.next()));
|
||||
|
||||
|
@ -530,7 +564,8 @@ public class TestWALEntryStream {
|
|||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return fs.getFileStatus(walPath).getLen() > 0;
|
||||
return fs.getFileStatus(walPath).getLen() > 0 &&
|
||||
((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -539,12 +574,13 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
});
|
||||
long walLength = fs.getFileStatus(walPath).getLen();
|
||||
|
||||
ReplicationSourceWALReader reader = createReader(false, CONF);
|
||||
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
assertEquals(walPath, entryBatch.getLastWalPath());
|
||||
|
||||
long walLength = fs.getFileStatus(walPath).getLen();
|
||||
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
|
||||
walLength, entryBatch.getLastWalPosition() <= walLength);
|
||||
assertEquals(1, entryBatch.getNbEntries());
|
||||
|
@ -869,7 +905,7 @@ public class TestWALEntryStream {
|
|||
*/
|
||||
@Test
|
||||
public void testCleanClosedWALs() throws Exception {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(
|
||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
|
||||
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
||||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||
appendToLogAndSync();
|
||||
|
|
Loading…
Reference in New Issue