diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index bf0eee293f1..0d9c14048bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -961,7 +961,7 @@ public abstract class AbstractFSWAL implements WAL { } protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { - return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync); + return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); } protected boolean isLogRollRequested() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 3bb0a63415d..ae26a47a494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -31,7 +31,6 @@ import java.util.Comparator; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.OptionalLong; import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; @@ -133,10 +132,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); - private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { - int c = Long.compare(o1.getTxid(), o2.getTxid()); - return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); - }; + private static final Comparator SEQ_COMPARATOR = Comparator.comparingLong( + SyncFuture::getTxid).thenComparingInt(System::identityHashCode); public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; @@ -373,7 +370,8 @@ public class AsyncFSWAL extends AbstractFSWAL { // sync futures then just use the default one. private boolean isHsync(long beginTxid, long endTxid) { SortedSet futures = - syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); + syncFutures.subSet(new SyncFuture().reset(beginTxid, false), + new SyncFuture().reset(endTxid + 1, false)); if (futures.isEmpty()) { return useHsync; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index ba1c640712a..690f54520ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -888,8 +888,9 @@ public class FSHLog extends AbstractFSWAL { private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException { - if (syncFuture.isThrowable()) { - throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable()); + Throwable t = syncFuture.getThrowable(); + if (t != null) { + throw new FailedSyncBeforeLogCloseException(t); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index edba5df2aa3..862e91826b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.yetus.audience.InterfaceAudience; @@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private class SyncFuture { - // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads - // to coordinate on but it did not give any obvious advantage and some issues with order in which - // events happen. + private static final long NOT_DONE = -1L; + private Thread t; /** - * The transaction id of this operation, monotonically increases. + * Lock protecting the thread-safe fields. + */ + private final ReentrantLock doneLock; + + /** + * Condition to wait on for client threads. + */ + private final Condition doneCondition; + + /* + * Fields below are protected by {@link SyncFuture#doneLock}. */ - private long txid; /** * The transaction id that was set in here when we were marked done. Should be equal or > txnId. @@ -65,16 +74,30 @@ class SyncFuture { */ private Throwable throwable; - private Thread t; + /* + * Fields below are created once at reset() and accessed without any lock. Should be ok as they + * are immutable for this instance of sync future until it is reset. + */ + + /** + * The transaction id of this operation, monotonically increases. + */ + private long txid; private boolean forceSync; + SyncFuture() { + this.doneLock = new ReentrantLock(); + this.doneCondition = doneLock.newCondition(); + } + /** * Call this method to clear old usage and get it ready for new deploy. + * * @param txid the new transaction id * @return this */ - synchronized SyncFuture reset(long txid) { + SyncFuture reset(long txid, boolean forceSync) { if (t != null && t != Thread.currentThread()) { throw new IllegalStateException(); } @@ -83,30 +106,26 @@ class SyncFuture { throw new IllegalStateException("" + txid + " " + Thread.currentThread()); } this.doneTxid = NOT_DONE; + this.forceSync = forceSync; this.txid = txid; this.throwable = null; return this; } @Override - public synchronized String toString() { + public String toString() { return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() + " threadName=" + t.getName(); } - synchronized long getTxid() { + long getTxid() { return this.txid; } - synchronized boolean isForceSync() { + boolean isForceSync() { return forceSync; } - synchronized SyncFuture setForceSync(boolean forceSync) { - this.forceSync = forceSync; - return this; - } - /** * Returns the thread that owned this sync future, use with caution as we return the reference to * the actual thread object. @@ -122,55 +141,67 @@ class SyncFuture { * @return True if we successfully marked this outstanding future as completed/done. Returns false * if this future is already 'done' when this method called. */ - synchronized boolean done(final long txid, final Throwable t) { - if (isDone()) { - return false; - } - this.throwable = t; - if (txid < this.txid) { - // Something badly wrong. - if (throwable == null) { - this.throwable = - new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid); + boolean done(final long txid, final Throwable t) { + doneLock.lock(); + try { + if (doneTxid != NOT_DONE) { + return false; } + this.throwable = t; + if (txid < this.txid) { + // Something badly wrong. + if (throwable == null) { + this.throwable = + new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid); + } + } + // Mark done. + this.doneTxid = txid; + doneCondition.signalAll(); + return true; + } finally { + doneLock.unlock(); } - // Mark done. - this.doneTxid = txid; - // Wake up waiting threads. - notify(); - return true; } - boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException(); - } - - synchronized long get(long timeoutNs) throws InterruptedException, + long get(long timeoutNs) throws InterruptedException, ExecutionException, TimeoutIOException { - final long done = System.nanoTime() + timeoutNs; - while (!isDone()) { - wait(1000); - if (System.nanoTime() >= done) { - throw new TimeoutIOException( - "Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs) - + " ms for txid=" + this.txid + ", WAL system stuck?"); + doneLock.lock(); + try { + while (doneTxid == NOT_DONE) { + if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) { + throw new TimeoutIOException("Failed to get sync result after " + + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + this.txid + + ", WAL system stuck?"); + } } + if (this.throwable != null) { + throw new ExecutionException(this.throwable); + } + return this.doneTxid; + } finally { + doneLock.unlock(); } - if (this.throwable != null) { - throw new ExecutionException(this.throwable); + } + + boolean isDone() { + doneLock.lock(); + try { + return this.doneTxid != NOT_DONE; + } finally { + doneLock.unlock(); } - return this.doneTxid; } - synchronized boolean isDone() { - return this.doneTxid != NOT_DONE; - } - - synchronized boolean isThrowable() { - return isDone() && getThrowable() != null; - } - - synchronized Throwable getThrowable() { - return this.throwable; + Throwable getThrowable() { + doneLock.lock(); + try { + if (doneTxid == NOT_DONE) { + return null; + } + return this.throwable; + } finally { + doneLock.unlock(); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java index 1b2477c044a..64956536dc0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java @@ -38,10 +38,10 @@ public class TestSyncFuture { public void testGet() throws Exception { long timeout = 5000; long txid = 100000; - SyncFuture syncFulture = new SyncFuture().reset(txid); + SyncFuture syncFulture = new SyncFuture().reset(txid, false); syncFulture.done(txid, null); assertEquals(txid, syncFulture.get(timeout)); - syncFulture.reset(txid).get(timeout); + syncFulture.reset(txid, false).get(timeout); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java index 070aaf27a01..dd4590a6a59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java @@ -42,10 +42,10 @@ public class TestSyncFutureCache { final Configuration conf = HBaseConfiguration.create(); SyncFutureCache cache = new SyncFutureCache(conf); try { - SyncFuture future0 = cache.getIfPresentOrNew().reset(0); + SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false); assertNotNull(future0); // Get another future from the same thread, should be different one. - SyncFuture future1 = cache.getIfPresentOrNew().reset(1); + SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false); assertNotNull(future1); assertNotSame(future0, future1); cache.offer(future1); @@ -55,7 +55,8 @@ public class TestSyncFutureCache { assertEquals(future3, future0); final SyncFuture[] future4 = new SyncFuture[1]; // From a different thread - CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get(); + CompletableFuture.runAsync(() -> + future4[0] = cache.getIfPresentOrNew().reset(4, false)).get(); assertNotNull(future4[0]); assertNotSame(future3, future4[0]); // Clean up