HBASE-25998: Redo synchronization in SyncFuture (#3399)

Currently uses coarse grained synchronized approach that seems to
create a lot of contention. This patch

- Uses a reentrant lock instead of synchronized monitor
- Switches to a condition variable based waiting rather than busy wait
- Removed synchronization for unnecessary fields

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 6bafb59642)
This commit is contained in:
Bharath Vissapragada 2021-06-18 10:37:10 -07:00 committed by GitHub
parent 836fd3c52a
commit 3b58919246
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 102 additions and 71 deletions

View File

@ -961,7 +961,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
}
protected boolean isLogRollRequested() {

View File

@ -31,7 +31,6 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
@ -133,10 +132,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
int c = Long.compare(o1.getTxid(), o2.getTxid());
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
};
private static final Comparator<SyncFuture> SEQ_COMPARATOR = Comparator.comparingLong(
SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
@ -373,7 +370,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// sync futures then just use the default one.
private boolean isHsync(long beginTxid, long endTxid) {
SortedSet<SyncFuture> futures =
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
new SyncFuture().reset(endTxid + 1, false));
if (futures.isEmpty()) {
return useHsync;
}

View File

@ -888,8 +888,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
Throwable t = syncFuture.getThrowable();
if (t != null) {
throw new FailedSyncBeforeLogCloseException(t);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.yetus.audience.InterfaceAudience;
@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
class SyncFuture {
// Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
// to coordinate on but it did not give any obvious advantage and some issues with order in which
// events happen.
private static final long NOT_DONE = -1L;
private Thread t;
/**
* The transaction id of this operation, monotonically increases.
* Lock protecting the thread-safe fields.
*/
private final ReentrantLock doneLock;
/**
* Condition to wait on for client threads.
*/
private final Condition doneCondition;
/*
* Fields below are protected by {@link SyncFuture#doneLock}.
*/
private long txid;
/**
* The transaction id that was set in here when we were marked done. Should be equal or > txnId.
@ -65,16 +74,30 @@ class SyncFuture {
*/
private Throwable throwable;
private Thread t;
/*
* Fields below are created once at reset() and accessed without any lock. Should be ok as they
* are immutable for this instance of sync future until it is reset.
*/
/**
* The transaction id of this operation, monotonically increases.
*/
private long txid;
private boolean forceSync;
SyncFuture() {
this.doneLock = new ReentrantLock();
this.doneCondition = doneLock.newCondition();
}
/**
* Call this method to clear old usage and get it ready for new deploy.
*
* @param txid the new transaction id
* @return this
*/
synchronized SyncFuture reset(long txid) {
SyncFuture reset(long txid, boolean forceSync) {
if (t != null && t != Thread.currentThread()) {
throw new IllegalStateException();
}
@ -83,30 +106,26 @@ class SyncFuture {
throw new IllegalStateException("" + txid + " " + Thread.currentThread());
}
this.doneTxid = NOT_DONE;
this.forceSync = forceSync;
this.txid = txid;
this.throwable = null;
return this;
}
@Override
public synchronized String toString() {
public String toString() {
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
" threadName=" + t.getName();
}
synchronized long getTxid() {
long getTxid() {
return this.txid;
}
synchronized boolean isForceSync() {
boolean isForceSync() {
return forceSync;
}
synchronized SyncFuture setForceSync(boolean forceSync) {
this.forceSync = forceSync;
return this;
}
/**
* Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object.
@ -122,8 +141,10 @@ class SyncFuture {
* @return True if we successfully marked this outstanding future as completed/done. Returns false
* if this future is already 'done' when this method called.
*/
synchronized boolean done(final long txid, final Throwable t) {
if (isDone()) {
boolean done(final long txid, final Throwable t) {
doneLock.lock();
try {
if (doneTxid != NOT_DONE) {
return false;
}
this.throwable = t;
@ -136,41 +157,51 @@ class SyncFuture {
}
// Mark done.
this.doneTxid = txid;
// Wake up waiting threads.
notify();
doneCondition.signalAll();
return true;
} finally {
doneLock.unlock();
}
}
boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
synchronized long get(long timeoutNs) throws InterruptedException,
long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = System.nanoTime() + timeoutNs;
while (!isDone()) {
wait(1000);
if (System.nanoTime() >= done) {
throw new TimeoutIOException(
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ " ms for txid=" + this.txid + ", WAL system stuck?");
doneLock.lock();
try {
while (doneTxid == NOT_DONE) {
if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
throw new TimeoutIOException("Failed to get sync result after "
+ TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + this.txid
+ ", WAL system stuck?");
}
}
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
return this.doneTxid;
} finally {
doneLock.unlock();
}
}
synchronized boolean isDone() {
boolean isDone() {
doneLock.lock();
try {
return this.doneTxid != NOT_DONE;
} finally {
doneLock.unlock();
}
}
synchronized boolean isThrowable() {
return isDone() && getThrowable() != null;
Throwable getThrowable() {
doneLock.lock();
try {
if (doneTxid == NOT_DONE) {
return null;
}
synchronized Throwable getThrowable() {
return this.throwable;
} finally {
doneLock.unlock();
}
}
}

View File

@ -38,10 +38,10 @@ public class TestSyncFuture {
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture().reset(txid);
SyncFuture syncFulture = new SyncFuture().reset(txid, false);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid).get(timeout);
syncFulture.reset(txid, false).get(timeout);
}
}

View File

@ -42,10 +42,10 @@ public class TestSyncFutureCache {
final Configuration conf = HBaseConfiguration.create();
SyncFutureCache cache = new SyncFutureCache(conf);
try {
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
assertNotNull(future0);
// Get another future from the same thread, should be different one.
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
assertNotNull(future1);
assertNotSame(future0, future1);
cache.offer(future1);
@ -55,7 +55,8 @@ public class TestSyncFutureCache {
assertEquals(future3, future0);
final SyncFuture[] future4 = new SyncFuture[1];
// From a different thread
CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get();
CompletableFuture.runAsync(() ->
future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
assertNotNull(future4[0]);
assertNotSame(future3, future4[0]);
// Clean up