From 9355506c3008ba4aafef8bc75ce8063fc226c8f1 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 28 Mar 2022 07:53:28 +0800 Subject: [PATCH] HBASE-26832 Avoid repeated releasing of flushed wal entries in AsyncFSWAL#syncCompleted (#4281) Signed-off-by: Xiaolin Ha (cherry picked from commit 4f491fd5e40c0986dc92f8b231d419a2b07e5a0e) --- .../hbase/regionserver/wal/AsyncFSWAL.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 1e4a2dad993..fca0ff8ebf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -162,7 +162,7 @@ public class AsyncFSWAL extends AbstractFSWAL { private static final int MAX_EPOCH = 0x3FFFFFFF; // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old // writer to be closed. - // the second lowest bit is writerBorken which means the current writer is broken and rollWriter + // the second lowest bit is writerBroken which means the current writer is broken and rollWriter // is needed. // all other bits are the epoch number of the current writer, this is used to detect whether the // writer is still the one when you issue the sync. @@ -340,7 +340,38 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { + private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, + long startTimeNs) { + // Please see the last several comments on HBASE-22761, it is possible that we get a + // syncCompleted which acks a previous sync request after we received a syncFailed on the same + // writer. So here we will also check on the epoch and state, if the epoch has already been + // changed, i.e, we have already rolled the writer, or the writer is already broken, we should + // just skip here, to avoid mess up the state or accidentally release some WAL entries and + // cause data corruption. + // The syncCompleted call is on the critical write path so we should try our best to make it + // fast. So here we do not hold consumeLock, for increasing performance. It is safe because + // there are only 3 possible situations: + // 1. For normal case, the only place where we change epochAndState is when rolling the writer. + // Before rolling actually happen, we will only change the state to waitingRoll which is another + // bit than writerBroken, and when we actually change the epoch, we can make sure that there is + // no out going sync request. So we will always pass the check here and there is no problem. + // 2. The writer is broken, but we have not called syncFailed yet. In this case, since + // syncFailed and syncCompleted are executed in the same thread, we will just face the same + // situation with #1. + // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are + // only 2 possible situations: + // a. we arrive before we actually roll the writer, then we will find out the writer is broken + // and give up. + // b. we arrive after we actually roll the writer, then we will find out the epoch is changed + // and give up. + // For both #a and #b, we do not need to hold the consumeLock as we will always update the + // epochAndState as a whole. + // So in general, for all the cases above, we do not need to hold the consumeLock. + int epochAndState = this.epochAndState; + if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { + LOG.warn("Got a sync complete call after the writer is broken, skip"); + return; + } highestSyncedTxid.set(processedTxid); for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); @@ -396,7 +427,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (error != null) { syncFailed(epoch, error); } else { - syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); + syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); } }, consumeExecutor); }