HBASE-25998: Redo synchronization in SyncFuture

Currently uses coarse grained synchronized approach that seems to
create a lot of contention. This patch

- Uses a reentrant lock instead of synchronized monitor
- Switches to a condition variable based waiting rather than busy wait
- Removed synchronization for unnecessary fields

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Bharath Vissapragada 2021-06-10 21:45:22 -07:00
parent eb242be674
commit 6bafb59642
No known key found for this signature in database
GPG Key ID: 18AE42A0B5A93FA7
6 changed files with 102 additions and 71 deletions

View File

@ -1041,7 +1041,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync); return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
} }
protected boolean isLogRollRequested() { protected boolean isLogRollRequested() {

View File

@ -31,7 +31,6 @@ import java.util.Comparator;
import java.util.Deque; import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.OptionalLong;
import java.util.Queue; import java.util.Queue;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -131,10 +130,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> { private static final Comparator<SyncFuture> SEQ_COMPARATOR = Comparator.comparingLong(
int c = Long.compare(o1.getTxid(), o2.getTxid()); SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
};
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
@ -371,7 +368,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// sync futures then just use the default one. // sync futures then just use the default one.
private boolean isHsync(long beginTxid, long endTxid) { private boolean isHsync(long beginTxid, long endTxid) {
SortedSet<SyncFuture> futures = SortedSet<SyncFuture> futures =
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
new SyncFuture().reset(endTxid + 1, false));
if (futures.isEmpty()) { if (futures.isEmpty()) {
return useHsync; return useHsync;
} }

View File

@ -870,8 +870,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException { private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
if (syncFuture.isThrowable()) { Throwable t = syncFuture.getThrowable();
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable()); if (t != null) {
throw new FailedSyncBeforeLogCloseException(t);
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class SyncFuture { class SyncFuture {
// Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
// to coordinate on but it did not give any obvious advantage and some issues with order in which
// events happen.
private static final long NOT_DONE = -1L; private static final long NOT_DONE = -1L;
private Thread t;
/** /**
* The transaction id of this operation, monotonically increases. * Lock protecting the thread-safe fields.
*/
private final ReentrantLock doneLock;
/**
* Condition to wait on for client threads.
*/
private final Condition doneCondition;
/*
* Fields below are protected by {@link SyncFuture#doneLock}.
*/ */
private long txid;
/** /**
* The transaction id that was set in here when we were marked done. Should be equal or > txnId. * The transaction id that was set in here when we were marked done. Should be equal or > txnId.
@ -65,16 +74,30 @@ class SyncFuture {
*/ */
private Throwable throwable; private Throwable throwable;
private Thread t; /*
* Fields below are created once at reset() and accessed without any lock. Should be ok as they
* are immutable for this instance of sync future until it is reset.
*/
/**
* The transaction id of this operation, monotonically increases.
*/
private long txid;
private boolean forceSync; private boolean forceSync;
SyncFuture() {
this.doneLock = new ReentrantLock();
this.doneCondition = doneLock.newCondition();
}
/** /**
* Call this method to clear old usage and get it ready for new deploy. * Call this method to clear old usage and get it ready for new deploy.
*
* @param txid the new transaction id * @param txid the new transaction id
* @return this * @return this
*/ */
synchronized SyncFuture reset(long txid) { SyncFuture reset(long txid, boolean forceSync) {
if (t != null && t != Thread.currentThread()) { if (t != null && t != Thread.currentThread()) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -83,30 +106,26 @@ class SyncFuture {
throw new IllegalStateException("" + txid + " " + Thread.currentThread()); throw new IllegalStateException("" + txid + " " + Thread.currentThread());
} }
this.doneTxid = NOT_DONE; this.doneTxid = NOT_DONE;
this.forceSync = forceSync;
this.txid = txid; this.txid = txid;
this.throwable = null; this.throwable = null;
return this; return this;
} }
@Override @Override
public synchronized String toString() { public String toString() {
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() + return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
" threadName=" + t.getName(); " threadName=" + t.getName();
} }
synchronized long getTxid() { long getTxid() {
return this.txid; return this.txid;
} }
synchronized boolean isForceSync() { boolean isForceSync() {
return forceSync; return forceSync;
} }
synchronized SyncFuture setForceSync(boolean forceSync) {
this.forceSync = forceSync;
return this;
}
/** /**
* Returns the thread that owned this sync future, use with caution as we return the reference to * Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object. * the actual thread object.
@ -122,8 +141,10 @@ class SyncFuture {
* @return True if we successfully marked this outstanding future as completed/done. Returns false * @return True if we successfully marked this outstanding future as completed/done. Returns false
* if this future is already 'done' when this method called. * if this future is already 'done' when this method called.
*/ */
synchronized boolean done(final long txid, final Throwable t) { boolean done(final long txid, final Throwable t) {
if (isDone()) { doneLock.lock();
try {
if (doneTxid != NOT_DONE) {
return false; return false;
} }
this.throwable = t; this.throwable = t;
@ -136,41 +157,51 @@ class SyncFuture {
} }
// Mark done. // Mark done.
this.doneTxid = txid; this.doneTxid = txid;
// Wake up waiting threads. doneCondition.signalAll();
notify();
return true; return true;
} finally {
doneLock.unlock();
}
} }
boolean cancel(boolean mayInterruptIfRunning) { long get(long timeoutNs) throws InterruptedException,
throw new UnsupportedOperationException();
}
synchronized long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException { ExecutionException, TimeoutIOException {
final long done = System.nanoTime() + timeoutNs; doneLock.lock();
while (!isDone()) { try {
wait(1000); while (doneTxid == NOT_DONE) {
if (System.nanoTime() >= done) { if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
throw new TimeoutIOException( throw new TimeoutIOException("Failed to get sync result after "
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + this.txid
+ " ms for txid=" + this.txid + ", WAL system stuck?"); + ", WAL system stuck?");
} }
} }
if (this.throwable != null) { if (this.throwable != null) {
throw new ExecutionException(this.throwable); throw new ExecutionException(this.throwable);
} }
return this.doneTxid; return this.doneTxid;
} finally {
doneLock.unlock();
}
} }
synchronized boolean isDone() { boolean isDone() {
doneLock.lock();
try {
return this.doneTxid != NOT_DONE; return this.doneTxid != NOT_DONE;
} finally {
doneLock.unlock();
}
} }
synchronized boolean isThrowable() { Throwable getThrowable() {
return isDone() && getThrowable() != null; doneLock.lock();
try {
if (doneTxid == NOT_DONE) {
return null;
} }
synchronized Throwable getThrowable() {
return this.throwable; return this.throwable;
} finally {
doneLock.unlock();
}
} }
} }

View File

@ -38,10 +38,10 @@ public class TestSyncFuture {
public void testGet() throws Exception { public void testGet() throws Exception {
long timeout = 5000; long timeout = 5000;
long txid = 100000; long txid = 100000;
SyncFuture syncFulture = new SyncFuture().reset(txid); SyncFuture syncFulture = new SyncFuture().reset(txid, false);
syncFulture.done(txid, null); syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout)); assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid).get(timeout); syncFulture.reset(txid, false).get(timeout);
} }
} }

View File

@ -42,10 +42,10 @@ public class TestSyncFutureCache {
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
SyncFutureCache cache = new SyncFutureCache(conf); SyncFutureCache cache = new SyncFutureCache(conf);
try { try {
SyncFuture future0 = cache.getIfPresentOrNew().reset(0); SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
assertNotNull(future0); assertNotNull(future0);
// Get another future from the same thread, should be different one. // Get another future from the same thread, should be different one.
SyncFuture future1 = cache.getIfPresentOrNew().reset(1); SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
assertNotNull(future1); assertNotNull(future1);
assertNotSame(future0, future1); assertNotSame(future0, future1);
cache.offer(future1); cache.offer(future1);
@ -55,7 +55,8 @@ public class TestSyncFutureCache {
assertEquals(future3, future0); assertEquals(future3, future0);
final SyncFuture[] future4 = new SyncFuture[1]; final SyncFuture[] future4 = new SyncFuture[1];
// From a different thread // From a different thread
CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get(); CompletableFuture.runAsync(() ->
future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
assertNotNull(future4[0]); assertNotNull(future4[0]);
assertNotSame(future3, future4[0]); assertNotSame(future3, future4[0]);
// Clean up // Clean up