HBASE-25998: Redo synchronization in SyncFuture (#3400)
Currently uses coarse grained synchronized approach that seems to
create a lot of contention. This patch
- Uses a reentrant lock instead of synchronized monitor
- Switches to a condition variable based waiting rather than busy wait
- Removed synchronization for unnecessary fields
Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 6bafb59642
)
This commit is contained in:
parent
a4ceabdc76
commit
58f067dc4d
|
@ -961,7 +961,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
|
||||
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
|
||||
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
|
||||
return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
|
||||
}
|
||||
|
||||
protected boolean isLogRollRequested() {
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.Comparator;
|
|||
import java.util.Deque;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Queue;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
@ -133,10 +132,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
|
||||
|
||||
private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
|
||||
int c = Long.compare(o1.getTxid(), o2.getTxid());
|
||||
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
|
||||
};
|
||||
private static final Comparator<SyncFuture> SEQ_COMPARATOR = Comparator.comparingLong(
|
||||
SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
|
||||
|
||||
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
|
||||
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
|
||||
|
@ -373,7 +370,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
// sync futures then just use the default one.
|
||||
private boolean isHsync(long beginTxid, long endTxid) {
|
||||
SortedSet<SyncFuture> futures =
|
||||
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
|
||||
syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
|
||||
new SyncFuture().reset(endTxid + 1, false));
|
||||
if (futures.isEmpty()) {
|
||||
return useHsync;
|
||||
}
|
||||
|
|
|
@ -888,8 +888,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
|
||||
|
||||
private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
|
||||
if (syncFuture.isThrowable()) {
|
||||
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
|
||||
Throwable t = syncFuture.getThrowable();
|
||||
if (t != null) {
|
||||
throw new FailedSyncBeforeLogCloseException(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -44,15 +45,23 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
class SyncFuture {
|
||||
// Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
|
||||
// to coordinate on but it did not give any obvious advantage and some issues with order in which
|
||||
// events happen.
|
||||
|
||||
private static final long NOT_DONE = -1L;
|
||||
private Thread t;
|
||||
|
||||
/**
|
||||
* The transaction id of this operation, monotonically increases.
|
||||
* Lock protecting the thread-safe fields.
|
||||
*/
|
||||
private final ReentrantLock doneLock;
|
||||
|
||||
/**
|
||||
* Condition to wait on for client threads.
|
||||
*/
|
||||
private final Condition doneCondition;
|
||||
|
||||
/*
|
||||
* Fields below are protected by {@link SyncFuture#doneLock}.
|
||||
*/
|
||||
private long txid;
|
||||
|
||||
/**
|
||||
* The transaction id that was set in here when we were marked done. Should be equal or > txnId.
|
||||
|
@ -65,16 +74,30 @@ class SyncFuture {
|
|||
*/
|
||||
private Throwable throwable;
|
||||
|
||||
private Thread t;
|
||||
/*
|
||||
* Fields below are created once at reset() and accessed without any lock. Should be ok as they
|
||||
* are immutable for this instance of sync future until it is reset.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The transaction id of this operation, monotonically increases.
|
||||
*/
|
||||
private long txid;
|
||||
|
||||
private boolean forceSync;
|
||||
|
||||
SyncFuture() {
|
||||
this.doneLock = new ReentrantLock();
|
||||
this.doneCondition = doneLock.newCondition();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this method to clear old usage and get it ready for new deploy.
|
||||
*
|
||||
* @param txid the new transaction id
|
||||
* @return this
|
||||
*/
|
||||
synchronized SyncFuture reset(long txid) {
|
||||
SyncFuture reset(long txid, boolean forceSync) {
|
||||
if (t != null && t != Thread.currentThread()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
@ -83,30 +106,26 @@ class SyncFuture {
|
|||
throw new IllegalStateException("" + txid + " " + Thread.currentThread());
|
||||
}
|
||||
this.doneTxid = NOT_DONE;
|
||||
this.forceSync = forceSync;
|
||||
this.txid = txid;
|
||||
this.throwable = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
public String toString() {
|
||||
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
|
||||
" threadName=" + t.getName();
|
||||
}
|
||||
|
||||
synchronized long getTxid() {
|
||||
long getTxid() {
|
||||
return this.txid;
|
||||
}
|
||||
|
||||
synchronized boolean isForceSync() {
|
||||
boolean isForceSync() {
|
||||
return forceSync;
|
||||
}
|
||||
|
||||
synchronized SyncFuture setForceSync(boolean forceSync) {
|
||||
this.forceSync = forceSync;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the thread that owned this sync future, use with caution as we return the reference to
|
||||
* the actual thread object.
|
||||
|
@ -122,8 +141,10 @@ class SyncFuture {
|
|||
* @return True if we successfully marked this outstanding future as completed/done. Returns false
|
||||
* if this future is already 'done' when this method called.
|
||||
*/
|
||||
synchronized boolean done(final long txid, final Throwable t) {
|
||||
if (isDone()) {
|
||||
boolean done(final long txid, final Throwable t) {
|
||||
doneLock.lock();
|
||||
try {
|
||||
if (doneTxid != NOT_DONE) {
|
||||
return false;
|
||||
}
|
||||
this.throwable = t;
|
||||
|
@ -136,41 +157,51 @@ class SyncFuture {
|
|||
}
|
||||
// Mark done.
|
||||
this.doneTxid = txid;
|
||||
// Wake up waiting threads.
|
||||
notify();
|
||||
doneCondition.signalAll();
|
||||
return true;
|
||||
} finally {
|
||||
doneLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean cancel(boolean mayInterruptIfRunning) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
synchronized long get(long timeoutNs) throws InterruptedException,
|
||||
long get(long timeoutNs) throws InterruptedException,
|
||||
ExecutionException, TimeoutIOException {
|
||||
final long done = System.nanoTime() + timeoutNs;
|
||||
while (!isDone()) {
|
||||
wait(1000);
|
||||
if (System.nanoTime() >= done) {
|
||||
throw new TimeoutIOException(
|
||||
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
||||
+ " ms for txid=" + this.txid + ", WAL system stuck?");
|
||||
doneLock.lock();
|
||||
try {
|
||||
while (doneTxid == NOT_DONE) {
|
||||
if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
|
||||
throw new TimeoutIOException("Failed to get sync result after "
|
||||
+ TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + this.txid
|
||||
+ ", WAL system stuck?");
|
||||
}
|
||||
}
|
||||
if (this.throwable != null) {
|
||||
throw new ExecutionException(this.throwable);
|
||||
}
|
||||
return this.doneTxid;
|
||||
} finally {
|
||||
doneLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean isDone() {
|
||||
boolean isDone() {
|
||||
doneLock.lock();
|
||||
try {
|
||||
return this.doneTxid != NOT_DONE;
|
||||
} finally {
|
||||
doneLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean isThrowable() {
|
||||
return isDone() && getThrowable() != null;
|
||||
Throwable getThrowable() {
|
||||
doneLock.lock();
|
||||
try {
|
||||
if (doneTxid == NOT_DONE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
synchronized Throwable getThrowable() {
|
||||
return this.throwable;
|
||||
} finally {
|
||||
doneLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,10 +38,10 @@ public class TestSyncFuture {
|
|||
public void testGet() throws Exception {
|
||||
long timeout = 5000;
|
||||
long txid = 100000;
|
||||
SyncFuture syncFulture = new SyncFuture().reset(txid);
|
||||
SyncFuture syncFulture = new SyncFuture().reset(txid, false);
|
||||
syncFulture.done(txid, null);
|
||||
assertEquals(txid, syncFulture.get(timeout));
|
||||
|
||||
syncFulture.reset(txid).get(timeout);
|
||||
syncFulture.reset(txid, false).get(timeout);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,10 +42,10 @@ public class TestSyncFutureCache {
|
|||
final Configuration conf = HBaseConfiguration.create();
|
||||
SyncFutureCache cache = new SyncFutureCache(conf);
|
||||
try {
|
||||
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
|
||||
SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
|
||||
assertNotNull(future0);
|
||||
// Get another future from the same thread, should be different one.
|
||||
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
|
||||
SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
|
||||
assertNotNull(future1);
|
||||
assertNotSame(future0, future1);
|
||||
cache.offer(future1);
|
||||
|
@ -55,7 +55,8 @@ public class TestSyncFutureCache {
|
|||
assertEquals(future3, future0);
|
||||
final SyncFuture[] future4 = new SyncFuture[1];
|
||||
// From a different thread
|
||||
CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get();
|
||||
CompletableFuture.runAsync(() ->
|
||||
future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
|
||||
assertNotNull(future4[0]);
|
||||
assertNotSame(future3, future4[0]);
|
||||
// Clean up
|
||||
|
|
Loading…
Reference in New Issue