Signed-off-by: Viraj Jasani vjasani@apache.org
(cherry picked from commit 5a19bcf
)
This commit is contained in:
parent
a40f4583e3
commit
2e24bad826
|
@ -202,12 +202,7 @@ public class FSHLog implements WAL {
|
||||||
*/
|
*/
|
||||||
private final RingBufferEventHandler ringBufferEventHandler;
|
private final RingBufferEventHandler ringBufferEventHandler;
|
||||||
|
|
||||||
/**
|
private final SyncFutureCache syncFutureCache;
|
||||||
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
|
|
||||||
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
|
|
||||||
* <p>
|
|
||||||
*/
|
|
||||||
private final ThreadLocal<SyncFuture> cachedSyncFutures;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
|
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
|
||||||
|
@ -597,12 +592,7 @@ public class FSHLog implements WAL {
|
||||||
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
|
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
|
||||||
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
|
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
|
||||||
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
|
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
|
||||||
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
|
this.syncFutureCache = new SyncFutureCache(conf);
|
||||||
@Override
|
|
||||||
protected SyncFuture initialValue() {
|
|
||||||
return new SyncFuture();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// Starting up threads in constructor is a no no; Interface should have an init call.
|
// Starting up threads in constructor is a no no; Interface should have an init call.
|
||||||
this.disruptor.start();
|
this.disruptor.start();
|
||||||
}
|
}
|
||||||
|
@ -1126,6 +1116,10 @@ public class FSHLog implements WAL {
|
||||||
// With disruptor down, this is safe to let go.
|
// With disruptor down, this is safe to let go.
|
||||||
if (this.appendExecutor != null) this.appendExecutor.shutdown();
|
if (this.appendExecutor != null) this.appendExecutor.shutdown();
|
||||||
|
|
||||||
|
if (syncFutureCache != null) {
|
||||||
|
syncFutureCache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// Tell our listeners that the log is closing
|
// Tell our listeners that the log is closing
|
||||||
if (!this.listeners.isEmpty()) {
|
if (!this.listeners.isEmpty()) {
|
||||||
for (WALActionsListener i : this.listeners) {
|
for (WALActionsListener i : this.listeners) {
|
||||||
|
@ -1496,7 +1490,8 @@ public class FSHLog implements WAL {
|
||||||
return this.disruptor.getRingBuffer().next();
|
return this.disruptor.getRingBuffer().next();
|
||||||
}
|
}
|
||||||
|
|
||||||
private SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
|
@InterfaceAudience.Private
|
||||||
|
public SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
|
||||||
long sequence = this.disruptor.getRingBuffer().next();
|
long sequence = this.disruptor.getRingBuffer().next();
|
||||||
return publishSyncOnRingBuffer(sequence, span, forceSync);
|
return publishSyncOnRingBuffer(sequence, span, forceSync);
|
||||||
}
|
}
|
||||||
|
@ -1523,10 +1518,6 @@ public class FSHLog implements WAL {
|
||||||
syncFuture.get(walSyncTimeout);
|
syncFuture.get(walSyncTimeout);
|
||||||
return syncFuture.getSpan();
|
return syncFuture.getSpan();
|
||||||
} catch (TimeoutIOException tioe) {
|
} catch (TimeoutIOException tioe) {
|
||||||
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
|
|
||||||
// still refer to it, so if this thread use it next time may get a wrong
|
|
||||||
// result.
|
|
||||||
this.cachedSyncFutures.remove();
|
|
||||||
throw tioe;
|
throw tioe;
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.warn("Interrupted", ie);
|
LOG.warn("Interrupted", ie);
|
||||||
|
@ -1544,7 +1535,7 @@ public class FSHLog implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
private SyncFuture getSyncFuture(final long sequence, Span span) {
|
private SyncFuture getSyncFuture(final long sequence, Span span) {
|
||||||
return cachedSyncFutures.get().reset(sequence);
|
return syncFutureCache.getIfPresentOrNew().reset(sequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void postSync(final long timeInNanos, final int handlerSyncs) {
|
private void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||||
|
@ -1815,6 +1806,10 @@ public class FSHLog implements WAL {
|
||||||
return syncFuture;
|
return syncFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isSafePointAttained() {
|
||||||
|
return safePointAttainedLatch.getCount() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
|
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
|
||||||
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
|
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
|
||||||
|
@ -1902,7 +1897,7 @@ public class FSHLog implements WAL {
|
||||||
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
|
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
|
||||||
this.syncFutures[i].done(sequence, e);
|
this.syncFutures[i].done(sequence, e);
|
||||||
}
|
}
|
||||||
this.syncFuturesCount.set(0);
|
offerDoneSyncsBackToCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2018,12 +2013,25 @@ public class FSHLog implements WAL {
|
||||||
new DamagedWALException("On sync", this.exception));
|
new DamagedWALException("On sync", this.exception));
|
||||||
}
|
}
|
||||||
attainSafePoint(sequence);
|
attainSafePoint(sequence);
|
||||||
this.syncFuturesCount.set(0);
|
// It is critical that we offer the futures back to the cache for reuse here after the
|
||||||
|
// safe point is attained and all the clean up has been done. There have been
|
||||||
|
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
|
||||||
|
offerDoneSyncsBackToCache();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
|
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offers the finished syncs back to the cache for reuse.
|
||||||
|
*/
|
||||||
|
private void offerDoneSyncsBackToCache() {
|
||||||
|
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
|
||||||
|
syncFutureCache.offer(syncFutures[i]);
|
||||||
|
}
|
||||||
|
this.syncFuturesCount.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
SafePointZigZagLatch attainSafePoint() {
|
SafePointZigZagLatch attainSafePoint() {
|
||||||
this.zigzagLatch = new SafePointZigZagLatch();
|
this.zigzagLatch = new SafePointZigZagLatch();
|
||||||
return this.zigzagLatch;
|
return this.zigzagLatch;
|
||||||
|
|
|
@ -115,7 +115,8 @@ class SyncFuture {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized String toString() {
|
public synchronized String toString() {
|
||||||
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
|
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence +
|
||||||
|
" threadID=" + t.getId() + " threadName=" + t.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long getRingBufferSequence() {
|
synchronized long getRingBufferSequence() {
|
||||||
|
@ -191,6 +192,15 @@ class SyncFuture {
|
||||||
return this.doneSequence;
|
return this.doneSequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the thread that owned this sync future, use with caution as we return the reference to
|
||||||
|
* the actual thread object.
|
||||||
|
* @return the associated thread instance.
|
||||||
|
*/
|
||||||
|
public Thread getThread() {
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
public Long get(long timeout, TimeUnit unit)
|
public Long get(long timeout, TimeUnit unit)
|
||||||
throws InterruptedException, ExecutionException {
|
throws InterruptedException, ExecutionException {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A cache of {@link SyncFuture}s. This class supports two methods
|
||||||
|
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer(
|
||||||
|
* org.apache.hadoop.hbase.regionserver.wal.SyncFuture)}.
|
||||||
|
*
|
||||||
|
* Usage pattern:
|
||||||
|
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
|
||||||
|
* sf.reset(...);
|
||||||
|
* // Use the sync future
|
||||||
|
* finally: syncFutureCache.offer(sf);
|
||||||
|
*
|
||||||
|
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
|
||||||
|
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
|
||||||
|
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
|
||||||
|
*/
|
||||||
|
public final class SyncFutureCache {
|
||||||
|
|
||||||
|
private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
|
||||||
|
|
||||||
|
private final Cache<Thread, SyncFuture> syncFutureCache;
|
||||||
|
|
||||||
|
public SyncFutureCache(final Configuration conf) {
|
||||||
|
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||||
|
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||||
|
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
|
||||||
|
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SyncFuture getIfPresentOrNew() {
|
||||||
|
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
|
||||||
|
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
|
||||||
|
return (future == null) ? new SyncFuture() : future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offers the sync future back to the cache for reuse.
|
||||||
|
*/
|
||||||
|
public void offer(SyncFuture syncFuture) {
|
||||||
|
// It is ok to overwrite an existing mapping.
|
||||||
|
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
if (syncFutureCache != null) {
|
||||||
|
syncFutureCache.invalidateAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -34,6 +35,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
|
@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -69,6 +72,7 @@ import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -85,6 +89,8 @@ import org.junit.rules.TestName;
|
||||||
public class TestFSHLog {
|
public class TestFSHLog {
|
||||||
private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
|
private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
|
||||||
|
|
||||||
|
private static final long TEST_TIMEOUT_MS = 10000;
|
||||||
|
|
||||||
protected static Configuration conf;
|
protected static Configuration conf;
|
||||||
protected static FileSystem fs;
|
protected static FileSystem fs;
|
||||||
protected static Path dir;
|
protected static Path dir;
|
||||||
|
@ -162,6 +168,87 @@ public class TestFSHLog {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeadlockWithSyncOverwrites() throws Exception {
|
||||||
|
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
|
||||||
|
|
||||||
|
class FailingWriter implements WALProvider.Writer {
|
||||||
|
@Override public void sync(boolean forceSync) throws IOException {
|
||||||
|
throw new IOException("Injected failure..");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void append(WAL.Entry entry) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getLength() throws IOException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@Override public void close() throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Custom FSHLog implementation with a conditional wait before attaining safe point.
|
||||||
|
*/
|
||||||
|
class CustomFSHLog extends FSHLog {
|
||||||
|
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||||
|
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||||
|
String prefix, String suffix) throws IOException {
|
||||||
|
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void beforeWaitOnSafePoint() {
|
||||||
|
try {
|
||||||
|
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (FSHLog log = new CustomFSHLog(fs, walRootDir, dir.toString(),
|
||||||
|
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
|
||||||
|
log.setWriter(new FailingWriter());
|
||||||
|
Field ringBufferEventHandlerField =
|
||||||
|
FSHLog.class.getDeclaredField("ringBufferEventHandler");
|
||||||
|
ringBufferEventHandlerField.setAccessible(true);
|
||||||
|
FSHLog.RingBufferEventHandler ringBufferEventHandler =
|
||||||
|
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
|
||||||
|
// Force a safe point
|
||||||
|
final FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
|
||||||
|
try {
|
||||||
|
final SyncFuture future0 = log.publishSyncOnRingBuffer(null, false);
|
||||||
|
// Wait for the sync to be done.
|
||||||
|
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return future0.isDone();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Publish another sync from the same thread, this should not overwrite the done sync.
|
||||||
|
SyncFuture future1 = log.publishSyncOnRingBuffer(null, false);
|
||||||
|
assertFalse(future1.isDone());
|
||||||
|
// Unblock the safe point trigger..
|
||||||
|
blockBeforeSafePoint.countDown();
|
||||||
|
// Wait for the safe point to be reached. With the deadlock in HBASE-25984, this is never
|
||||||
|
// possible, thus blocking the sync pipeline.
|
||||||
|
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return latch.isSafePointAttained();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
// Force release the safe point, for the clean up.
|
||||||
|
latch.releaseSafePoint();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void addEdits(WAL log,
|
protected void addEdits(WAL log,
|
||||||
HRegionInfo hri,
|
HRegionInfo hri,
|
||||||
HTableDescriptor htd,
|
HTableDescriptor htd,
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNotSame;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ RegionServerTests.class, SmallTests.class })
|
||||||
|
public class TestSyncFutureCache {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSyncFutureCacheLifeCycle() throws Exception {
|
||||||
|
final Configuration conf = HBaseConfiguration.create();
|
||||||
|
final SyncFutureCache cache = new SyncFutureCache(conf);
|
||||||
|
try {
|
||||||
|
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
|
||||||
|
assertNotNull(future0);
|
||||||
|
// Get another future from the same thread, should be different one.
|
||||||
|
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
|
||||||
|
assertNotNull(future1);
|
||||||
|
assertNotSame(future0, future1);
|
||||||
|
cache.offer(future1);
|
||||||
|
// Should override.
|
||||||
|
cache.offer(future0);
|
||||||
|
SyncFuture future3 = cache.getIfPresentOrNew();
|
||||||
|
// Should return the cached entry that was first offered back.
|
||||||
|
assertEquals(future3, future0);
|
||||||
|
final SyncFuture[] future4 = new SyncFuture[1];
|
||||||
|
// From a different thread
|
||||||
|
Thread t = new Thread(new Runnable() {
|
||||||
|
@Override public void run() {
|
||||||
|
future4[0] = cache.getIfPresentOrNew().reset(4);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t.start();
|
||||||
|
t.join();
|
||||||
|
assertNotNull(future4[0]);
|
||||||
|
assertNotSame(future3, future4[0]);
|
||||||
|
// Clean up
|
||||||
|
cache.offer(future3);
|
||||||
|
cache.offer(future4[0]);
|
||||||
|
} finally {
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue