HBASE-25808 [branch-1] Backport improvements to FSHLog from branch-2 (#3197)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Andrew Purtell 2021-04-23 19:29:29 -07:00 committed by GitHub
parent 38b21f44ea
commit 3c87447810
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 21 deletions

View File

@ -1233,7 +1233,9 @@ public class FSHLog implements WAL {
*/
private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
final Throwable t) {
if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
if (!syncFuture.done(currentSequence, t)) {
throw new IllegalStateException();
}
// This function releases one sync future only.
return 1;
}
@ -1247,7 +1249,9 @@ public class FSHLog implements WAL {
private int releaseSyncFutures(final long currentSequence, final Throwable t) {
int syncCount = 0;
for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
if (syncFuture.getRingBufferSequence() > currentSequence) break;
if (syncFuture.getRingBufferSequence() > currentSequence) {
break;
}
releaseSyncFuture(syncFuture, currentSequence, t);
if (!this.syncFutures.remove(syncFuture)) {
throw new IllegalStateException(syncFuture.toString());
@ -1290,12 +1294,20 @@ public class FSHLog implements WAL {
int syncCount = 0;
try {
// Make a local copy of takeSyncFuture after we get it. We've been running into NPEs
// 2020-03-22 16:54:32,180 WARN [sync.1] wal.FSHLog$SyncRunner(589): UNEXPECTED
// java.lang.NullPointerException
// at org.apache.hadoop.hbase.regionserver.wal.FSHLog$SyncRunner.run(FSHLog.java:582)
// at java.lang.Thread.run(Thread.java:748)
SyncFuture sf;
while (true) {
takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
// Make local copy.
sf = takeSyncFuture;
currentSequence = this.sequence;
long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
long syncFutureSequence = sf.getRingBufferSequence();
if (syncFutureSequence > currentSequence) {
throw new IllegalStateException("currentSequence=" + currentSequence +
", syncFutureSequence=" + syncFutureSequence);
@ -1303,7 +1315,7 @@ public class FSHLog implements WAL {
// See if we can process any syncfutures BEFORE we go sync.
long currentHighestSyncedSequence = highestSyncedSequence.get();
if (currentSequence < currentHighestSyncedSequence) {
syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
syncCount += releaseSyncFuture(sf, currentHighestSyncedSequence, null);
// Done with the 'take'. Go around again and do a new 'take'.
continue;
}
@ -1311,13 +1323,13 @@ public class FSHLog implements WAL {
}
// I got something. Lets run. Save off current sequence number in case it changes
// while we run.
TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
TraceScope scope = Trace.continueSpan(sf.getSpan());
long start = System.nanoTime();
Throwable lastException = null;
try {
Trace.addTimelineAnnotation("syncing writer");
long unSyncedFlushSeq = highestUnsyncedSequence;
writer.sync(takeSyncFuture.isForceSync());
writer.sync(sf.isForceSync());
Trace.addTimelineAnnotation("writer synced");
if (unSyncedFlushSeq > currentSequence) {
currentSequence = unSyncedFlushSeq;
@ -1331,9 +1343,9 @@ public class FSHLog implements WAL {
lastException = e;
} finally {
// reattach the span to the future before releasing.
takeSyncFuture.setSpan(scope.detach());
sf.setSpan(scope.detach());
// First release what we 'took' from the queue.
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
syncCount += releaseSyncFuture(sf, currentSequence, lastException);
// Can we release other syncs?
syncCount += releaseSyncFutures(currentSequence, lastException);
if (lastException != null) {
@ -1856,7 +1868,7 @@ public class FSHLog implements WAL {
private final SyncFuture [] syncFutures;
// Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all
// syncFutures to the next sync'ing thread.
private volatile int syncFuturesCount = 0;
private AtomicInteger syncFuturesCount = new AtomicInteger();
private volatile SafePointZigZagLatch zigzagLatch;
/**
* Set if we get an exception appending or syncing so that all subsequence appends and syncs
@ -1884,8 +1896,10 @@ public class FSHLog implements WAL {
private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
// There could be handler-count syncFutures outstanding.
for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
this.syncFuturesCount = 0;
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
this.syncFutures[i].done(sequence, e);
}
this.syncFuturesCount.set(0);
}
/**
@ -1893,8 +1907,10 @@ public class FSHLog implements WAL {
*/
private boolean isOutstandingSyncs() {
// Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
if (!this.syncFutures[i].isDone()) {
return true;
}
}
return false;
@ -1924,9 +1940,12 @@ public class FSHLog implements WAL {
try {
if (truck.hasSyncFuturePayload()) {
this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
this.syncFutures[this.syncFuturesCount.getAndIncrement()] =
truck.unloadSyncFuturePayload();
// Force flush of syncs if we are carrying a full complement of syncFutures.
if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
if (this.syncFuturesCount.get() == this.syncFutures.length) {
endOfBatch = true;
}
} else if (truck.hasFSWALEntryPayload()) {
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
try {
@ -1964,7 +1983,9 @@ public class FSHLog implements WAL {
if (this.exception == null) {
// If not a batch, return to consume more events from the ring buffer before proceeding;
// we want to get up a batch of syncs and appends before we go do a filesystem sync.
if (!endOfBatch || this.syncFuturesCount <= 0) return;
if (!endOfBatch || this.syncFuturesCount.get() <= 0) {
return;
}
// syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows:
// * The maximum value possible for syncRunners.length is Integer.MAX_INT
// * syncRunnerIndex starts at 0 and is incremented only here
@ -1979,7 +2000,7 @@ public class FSHLog implements WAL {
// Below expects that the offer 'transfers' responsibility for the outstanding syncs to
// the syncRunner. We should never get an exception in here.
this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
this.syncFuturesCount);
this.syncFuturesCount.get());
} catch (Exception e) {
// Should NEVER get here.
requestLogRoll();
@ -1994,7 +2015,7 @@ public class FSHLog implements WAL {
new DamagedWALException("On sync", this.exception));
}
attainSafePoint(sequence);
this.syncFuturesCount = 0;
this.syncFuturesCount.set(0);
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
}
@ -2010,7 +2031,9 @@ public class FSHLog implements WAL {
* we proceeding.
*/
private void attainSafePoint(final long currentSequence) {
if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) {
return;
}
// If here, another thread is waiting on us to get to safe point. Don't leave it hanging.
beforeWaitOnSafePoint();
try {
@ -2096,12 +2119,16 @@ public class FSHLog implements WAL {
@Override
public void onStart() {
for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
for (SyncRunner syncRunner: this.syncRunners) {
syncRunner.start();
}
}
@Override
public void onShutdown() {
for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
for (SyncRunner syncRunner: this.syncRunners) {
syncRunner.interrupt();
}
}
}