diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 446d41caa45..e9aac92a880 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1233,7 +1233,9 @@ public class FSHLog implements WAL { */ private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, final Throwable t) { - if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException(); + if (!syncFuture.done(currentSequence, t)) { + throw new IllegalStateException(); + } // This function releases one sync future only. return 1; } @@ -1247,7 +1249,9 @@ public class FSHLog implements WAL { private int releaseSyncFutures(final long currentSequence, final Throwable t) { int syncCount = 0; for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { - if (syncFuture.getRingBufferSequence() > currentSequence) break; + if (syncFuture.getRingBufferSequence() > currentSequence) { + break; + } releaseSyncFuture(syncFuture, currentSequence, t); if (!this.syncFutures.remove(syncFuture)) { throw new IllegalStateException(syncFuture.toString()); @@ -1290,12 +1294,20 @@ public class FSHLog implements WAL { int syncCount = 0; try { + // Make a local copy of takeSyncFuture after we get it. We've been running into NPEs + // 2020-03-22 16:54:32,180 WARN [sync.1] wal.FSHLog$SyncRunner(589): UNEXPECTED + // java.lang.NullPointerException + // at org.apache.hadoop.hbase.regionserver.wal.FSHLog$SyncRunner.run(FSHLog.java:582) + // at java.lang.Thread.run(Thread.java:748) + SyncFuture sf; while (true) { takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); + // Make local copy. + sf = takeSyncFuture; currentSequence = this.sequence; - long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); + long syncFutureSequence = sf.getRingBufferSequence(); if (syncFutureSequence > currentSequence) { throw new IllegalStateException("currentSequence=" + currentSequence + ", syncFutureSequence=" + syncFutureSequence); @@ -1303,7 +1315,7 @@ public class FSHLog implements WAL { // See if we can process any syncfutures BEFORE we go sync. long currentHighestSyncedSequence = highestSyncedSequence.get(); if (currentSequence < currentHighestSyncedSequence) { - syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); + syncCount += releaseSyncFuture(sf, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. continue; } @@ -1311,13 +1323,13 @@ public class FSHLog implements WAL { } // I got something. Lets run. Save off current sequence number in case it changes // while we run. - TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); + TraceScope scope = Trace.continueSpan(sf.getSpan()); long start = System.nanoTime(); Throwable lastException = null; try { Trace.addTimelineAnnotation("syncing writer"); long unSyncedFlushSeq = highestUnsyncedSequence; - writer.sync(takeSyncFuture.isForceSync()); + writer.sync(sf.isForceSync()); Trace.addTimelineAnnotation("writer synced"); if (unSyncedFlushSeq > currentSequence) { currentSequence = unSyncedFlushSeq; @@ -1331,9 +1343,9 @@ public class FSHLog implements WAL { lastException = e; } finally { // reattach the span to the future before releasing. - takeSyncFuture.setSpan(scope.detach()); + sf.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); + syncCount += releaseSyncFuture(sf, currentSequence, lastException); // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { @@ -1856,7 +1868,7 @@ public class FSHLog implements WAL { private final SyncFuture [] syncFutures; // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all // syncFutures to the next sync'ing thread. - private volatile int syncFuturesCount = 0; + private AtomicInteger syncFuturesCount = new AtomicInteger(); private volatile SafePointZigZagLatch zigzagLatch; /** * Set if we get an exception appending or syncing so that all subsequence appends and syncs @@ -1884,8 +1896,10 @@ public class FSHLog implements WAL { private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { // There could be handler-count syncFutures outstanding. - for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); - this.syncFuturesCount = 0; + for (int i = 0; i < this.syncFuturesCount.get(); i++) { + this.syncFutures[i].done(sequence, e); + } + this.syncFuturesCount.set(0); } /** @@ -1893,8 +1907,10 @@ public class FSHLog implements WAL { */ private boolean isOutstandingSyncs() { // Look at SyncFutures in the EventHandler - for (int i = 0; i < this.syncFuturesCount; i++) { - if (!this.syncFutures[i].isDone()) return true; + for (int i = 0; i < this.syncFuturesCount.get(); i++) { + if (!this.syncFutures[i].isDone()) { + return true; + } } return false; @@ -1924,9 +1940,12 @@ public class FSHLog implements WAL { try { if (truck.hasSyncFuturePayload()) { - this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); + this.syncFutures[this.syncFuturesCount.getAndIncrement()] = + truck.unloadSyncFuturePayload(); // Force flush of syncs if we are carrying a full complement of syncFutures. - if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; + if (this.syncFuturesCount.get() == this.syncFutures.length) { + endOfBatch = true; + } } else if (truck.hasFSWALEntryPayload()) { TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { @@ -1964,7 +1983,9 @@ public class FSHLog implements WAL { if (this.exception == null) { // If not a batch, return to consume more events from the ring buffer before proceeding; // we want to get up a batch of syncs and appends before we go do a filesystem sync. - if (!endOfBatch || this.syncFuturesCount <= 0) return; + if (!endOfBatch || this.syncFuturesCount.get() <= 0) { + return; + } // syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows: // * The maximum value possible for syncRunners.length is Integer.MAX_INT // * syncRunnerIndex starts at 0 and is incremented only here @@ -1979,7 +2000,7 @@ public class FSHLog implements WAL { // Below expects that the offer 'transfers' responsibility for the outstanding syncs to // the syncRunner. We should never get an exception in here. this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures, - this.syncFuturesCount); + this.syncFuturesCount.get()); } catch (Exception e) { // Should NEVER get here. requestLogRoll(); @@ -1994,7 +2015,7 @@ public class FSHLog implements WAL { new DamagedWALException("On sync", this.exception)); } attainSafePoint(sequence); - this.syncFuturesCount = 0; + this.syncFuturesCount.set(0); } catch (Throwable t) { LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); } @@ -2010,7 +2031,9 @@ public class FSHLog implements WAL { * we proceeding. */ private void attainSafePoint(final long currentSequence) { - if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; + if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) { + return; + } // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. beforeWaitOnSafePoint(); try { @@ -2096,12 +2119,16 @@ public class FSHLog implements WAL { @Override public void onStart() { - for (SyncRunner syncRunner: this.syncRunners) syncRunner.start(); + for (SyncRunner syncRunner: this.syncRunners) { + syncRunner.start(); + } } @Override public void onShutdown() { - for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt(); + for (SyncRunner syncRunner: this.syncRunners) { + syncRunner.interrupt(); + } } }