HBASE-26832 Avoid repeated releasing of flushed wal entries in AsyncFSWAL#syncCompleted (#4281)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
(cherry picked from commit 4f491fd5e4)
This commit is contained in:
Duo Zhang 2022-03-28 07:53:28 +08:00
parent 7ae6be7c25
commit fd621a7c1b
1 changed files with 34 additions and 3 deletions

View File

@ -159,7 +159,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private static final int MAX_EPOCH = 0x3FFFFFFF; private static final int MAX_EPOCH = 0x3FFFFFFF;
// the lowest bit is waitingRoll, which means new writer is created and we are waiting for old // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old
// writer to be closed. // writer to be closed.
// the second lowest bit is writerBorken which means the current writer is broken and rollWriter // the second lowest bit is writerBroken which means the current writer is broken and rollWriter
// is needed. // is needed.
// all other bits are the epoch number of the current writer, this is used to detect whether the // all other bits are the epoch number of the current writer, this is used to detect whether the
// writer is still the one when you issue the sync. // writer is still the one when you issue the sync.
@ -340,7 +340,38 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid,
long startTimeNs) {
// Please see the last several comments on HBASE-22761, it is possible that we get a
// syncCompleted which acks a previous sync request after we received a syncFailed on the same
// writer. So here we will also check on the epoch and state, if the epoch has already been
// changed, i.e, we have already rolled the writer, or the writer is already broken, we should
// just skip here, to avoid mess up the state or accidentally release some WAL entries and
// cause data corruption.
// The syncCompleted call is on the critical write path so we should try our best to make it
// fast. So here we do not hold consumeLock, for increasing performance. It is safe because
// there are only 3 possible situations:
// 1. For normal case, the only place where we change epochAndState is when rolling the writer.
// Before rolling actually happen, we will only change the state to waitingRoll which is another
// bit than writerBroken, and when we actually change the epoch, we can make sure that there is
// no out going sync request. So we will always pass the check here and there is no problem.
// 2. The writer is broken, but we have not called syncFailed yet. In this case, since
// syncFailed and syncCompleted are executed in the same thread, we will just face the same
// situation with #1.
// 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
// only 2 possible situations:
// a. we arrive before we actually roll the writer, then we will find out the writer is broken
// and give up.
// b. we arrive after we actually roll the writer, then we will find out the epoch is changed
// and give up.
// For both #a and #b, we do not need to hold the consumeLock as we will always update the
// epochAndState as a whole.
// So in general, for all the cases above, we do not need to hold the consumeLock.
int epochAndState = this.epochAndState;
if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
LOG.warn("Got a sync complete call after the writer is broken, skip");
return;
}
highestSyncedTxid.set(processedTxid); highestSyncedTxid.set(processedTxid);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next(); FSWALEntry entry = iter.next();
@ -396,7 +427,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (error != null) { if (error != null) {
syncFailed(epoch, error); syncFailed(epoch, error);
} else { } else {
syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs);
} }
}, consumeExecutor); }, consumeExecutor);
} }