Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 5a19bcfa98
)
This commit is contained in:
parent
49131fa9d2
commit
a4ceabdc76
|
@ -304,11 +304,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
|
||||
|
||||
/**
|
||||
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
|
||||
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
|
||||
* <p>
|
||||
* A cache of sync futures reused by threads.
|
||||
*/
|
||||
private final ThreadLocal<SyncFuture> cachedSyncFutures;
|
||||
protected final SyncFutureCache syncFutureCache;
|
||||
|
||||
/**
|
||||
* The class name of the runtime implementation, used as prefix for logging/tracing.
|
||||
|
@ -478,12 +476,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
|
||||
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
|
||||
conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
|
||||
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
|
||||
@Override
|
||||
protected SyncFuture initialValue() {
|
||||
return new SyncFuture();
|
||||
}
|
||||
};
|
||||
this.syncFutureCache = new SyncFutureCache(conf);
|
||||
this.implClassName = getClass().getSimpleName();
|
||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||
archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
|
||||
|
@ -808,10 +801,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
} catch (TimeoutIOException tioe) {
|
||||
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
|
||||
// still refer to it, so if this thread use it next time may get a wrong
|
||||
// result.
|
||||
this.cachedSyncFutures.remove();
|
||||
throw tioe;
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted", ie);
|
||||
|
@ -913,6 +902,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
rollWriterLock.lock();
|
||||
try {
|
||||
doShutdown();
|
||||
if (syncFutureCache != null) {
|
||||
syncFutureCache.clear();
|
||||
}
|
||||
if (logArchiveExecutor != null) {
|
||||
logArchiveExecutor.shutdownNow();
|
||||
}
|
||||
|
@ -969,7 +961,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
|
||||
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
|
||||
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
|
||||
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
|
||||
}
|
||||
|
||||
protected boolean isLogRollRequested() {
|
||||
|
|
|
@ -265,6 +265,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper that marks the future as DONE and offers it back to the cache.
|
||||
*/
|
||||
private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
|
||||
future.done(txid, t);
|
||||
syncFutureCache.offer(future);
|
||||
}
|
||||
|
||||
private static boolean waitingRoll(int epochAndState) {
|
||||
return (epochAndState & 1) != 0;
|
||||
}
|
||||
|
@ -405,7 +413,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
|
||||
SyncFuture sync = iter.next();
|
||||
if (sync.getTxid() <= txid) {
|
||||
sync.done(txid, null);
|
||||
markFutureDoneAndOffer(sync, txid, null);
|
||||
iter.remove();
|
||||
finished++;
|
||||
if (addSyncTrace) {
|
||||
|
@ -427,7 +435,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
long maxSyncTxid = highestSyncedTxid.get();
|
||||
for (SyncFuture sync : syncFutures) {
|
||||
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
|
||||
sync.done(maxSyncTxid, null);
|
||||
markFutureDoneAndOffer(sync, maxSyncTxid, null);
|
||||
if (addSyncTrace) {
|
||||
addTimeAnnotation(sync, "writer synced");
|
||||
}
|
||||
|
@ -766,7 +774,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
}
|
||||
// and fail them
|
||||
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
||||
syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
|
||||
if (!(consumeExecutor instanceof EventLoop)) {
|
||||
consumeExecutor.shutdown();
|
||||
}
|
||||
|
|
|
@ -909,6 +909,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
return syncFuture;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return if the safepoint has been attained.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
boolean isSafePointAttained() {
|
||||
return this.safePointAttainedLatch.getCount() == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread
|
||||
* A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called
|
||||
|
@ -995,6 +1003,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
|
||||
this.syncFutures[i].done(sequence, e);
|
||||
}
|
||||
offerDoneSyncsBackToCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* Offers the finished syncs back to the cache for reuse.
|
||||
*/
|
||||
private void offerDoneSyncsBackToCache() {
|
||||
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
|
||||
syncFutureCache.offer(syncFutures[i]);
|
||||
}
|
||||
this.syncFuturesCount.set(0);
|
||||
}
|
||||
|
||||
|
@ -1109,7 +1127,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
? this.exception : new DamagedWALException("On sync", this.exception));
|
||||
}
|
||||
attainSafePoint(sequence);
|
||||
this.syncFuturesCount.set(0);
|
||||
// It is critical that we offer the futures back to the cache for reuse here after the
|
||||
// safe point is attained and all the clean up has been done. There have been
|
||||
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
|
||||
offerDoneSyncsBackToCache();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
|
||||
}
|
||||
|
|
|
@ -90,7 +90,8 @@ class SyncFuture {
|
|||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return "done=" + isDone() + ", txid=" + this.txid;
|
||||
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
|
||||
" threadName=" + t.getName();
|
||||
}
|
||||
|
||||
synchronized long getTxid() {
|
||||
|
@ -106,6 +107,15 @@ class SyncFuture {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the thread that owned this sync future, use with caution as we return the reference to
|
||||
* the actual thread object.
|
||||
* @return the associated thread instance.
|
||||
*/
|
||||
Thread getThread() {
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param txid the transaction id at which this future 'completed'.
|
||||
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||
|
||||
/**
|
||||
* A cache of {@link SyncFuture}s. This class supports two methods
|
||||
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer(SyncFuture)}.
|
||||
*
|
||||
* Usage pattern:
|
||||
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
|
||||
* sf.reset(...);
|
||||
* // Use the sync future
|
||||
* finally: syncFutureCache.offer(sf);
|
||||
*
|
||||
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
|
||||
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
|
||||
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class SyncFutureCache {
|
||||
|
||||
private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
|
||||
|
||||
private final Cache<Thread, SyncFuture> syncFutureCache;
|
||||
|
||||
public SyncFutureCache(final Configuration conf) {
|
||||
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
|
||||
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
|
||||
}
|
||||
|
||||
public SyncFuture getIfPresentOrNew() {
|
||||
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
|
||||
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
|
||||
return (future == null) ? new SyncFuture() : future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Offers the sync future back to the cache for reuse.
|
||||
*/
|
||||
public void offer(SyncFuture syncFuture) {
|
||||
// It is ok to overwrite an existing mapping.
|
||||
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
if (syncFutureCache != null) {
|
||||
syncFutureCache.invalidateAll();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
|
@ -27,6 +29,7 @@ import java.util.TreeMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -49,8 +53,10 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestFSHLog.class);
|
||||
|
||||
private static final long TEST_TIMEOUT_MS = 10000;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
|
@ -131,6 +139,89 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
|
||||
*/
|
||||
@Test
|
||||
public void testDeadlockWithSyncOverwrites() throws Exception {
|
||||
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
|
||||
|
||||
class FailingWriter implements WALProvider.Writer {
|
||||
@Override public void sync(boolean forceSync) throws IOException {
|
||||
throw new IOException("Injected failure..");
|
||||
}
|
||||
|
||||
@Override public void append(WAL.Entry entry) throws IOException {
|
||||
}
|
||||
|
||||
@Override public long getLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Custom FSHLog implementation with a conditional wait before attaining safe point.
|
||||
*/
|
||||
class CustomFSHLog extends FSHLog {
|
||||
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||
String prefix, String suffix) throws IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeWaitOnSafePoint() {
|
||||
try {
|
||||
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public SyncFuture publishSyncOnRingBuffer() {
|
||||
long sequence = getSequenceOnRingBuffer();
|
||||
return publishSyncOnRingBuffer(sequence, false);
|
||||
}
|
||||
}
|
||||
|
||||
final String name = this.name.getMethodName();
|
||||
try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
|
||||
log.setWriter(new FailingWriter());
|
||||
Field ringBufferEventHandlerField =
|
||||
FSHLog.class.getDeclaredField("ringBufferEventHandler");
|
||||
ringBufferEventHandlerField.setAccessible(true);
|
||||
FSHLog.RingBufferEventHandler ringBufferEventHandler =
|
||||
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
|
||||
// Force a safe point
|
||||
FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
|
||||
try {
|
||||
SyncFuture future0 = log.publishSyncOnRingBuffer();
|
||||
// Wait for the sync to be done.
|
||||
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
|
||||
// Publish another sync from the same thread, this should not overwrite the done sync.
|
||||
SyncFuture future1 = log.publishSyncOnRingBuffer();
|
||||
assertFalse(future1.isDone());
|
||||
// Unblock the safe point trigger..
|
||||
blockBeforeSafePoint.countDown();
|
||||
// Wait for the safe point to be reached.
|
||||
// With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline.
|
||||
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
|
||||
} finally {
|
||||
// Force release the safe point, for the clean up.
|
||||
latch.releaseSafePoint();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, SmallTests.class })
|
||||
public class TestSyncFutureCache {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSyncFutureCache.class);
|
||||
|
||||
@Test
|
||||
public void testSyncFutureCacheLifeCycle() throws Exception {
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
SyncFutureCache cache = new SyncFutureCache(conf);
|
||||
try {
|
||||
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
|
||||
assertNotNull(future0);
|
||||
// Get another future from the same thread, should be different one.
|
||||
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
|
||||
assertNotNull(future1);
|
||||
assertNotSame(future0, future1);
|
||||
cache.offer(future1);
|
||||
// Should override.
|
||||
cache.offer(future0);
|
||||
SyncFuture future3 = cache.getIfPresentOrNew();
|
||||
assertEquals(future3, future0);
|
||||
final SyncFuture[] future4 = new SyncFuture[1];
|
||||
// From a different thread
|
||||
CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get();
|
||||
assertNotNull(future4[0]);
|
||||
assertNotSame(future3, future4[0]);
|
||||
// Clean up
|
||||
cache.offer(future3);
|
||||
cache.offer(future4[0]);
|
||||
} finally {
|
||||
cache.clear();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue