HBASE-12055 TestBucketWriterThread hangs flakily based on timing (Nick Dimiduk)
This commit is contained in:
parent
bcbacefdd5
commit
7b7648322b
|
@ -256,8 +256,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
writerThreads[i] = new WriterThread(writerQueues.get(i), i);
|
writerThreads[i] = new WriterThread(writerQueues.get(i), i);
|
||||||
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
|
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
|
||||||
writerThreads[i].setDaemon(true);
|
writerThreads[i].setDaemon(true);
|
||||||
writerThreads[i].start();
|
|
||||||
}
|
}
|
||||||
|
startWriterThreads();
|
||||||
|
|
||||||
// Run the statistics thread periodically to print the cache statistics log
|
// Run the statistics thread periodically to print the cache statistics log
|
||||||
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
|
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
|
||||||
// every five minutes.
|
// every five minutes.
|
||||||
|
@ -270,6 +271,17 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
persistencePath + ", bucketAllocator=" + this.bucketAllocator);
|
persistencePath + ", bucketAllocator=" + this.bucketAllocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by the constructor to start the writer threads. Used by tests that need to override
|
||||||
|
* starting the threads.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void startWriterThreads() {
|
||||||
|
for (WriterThread thread : writerThreads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean isCacheEnabled() {
|
boolean isCacheEnabled() {
|
||||||
return this.cacheEnabled;
|
return this.cacheEnabled;
|
||||||
|
|
|
@ -37,22 +37,35 @@ import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import static java.lang.Thread.State;
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.core.IsNot.not;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@Category({IOTests.class, SmallTests.class})
|
@Category({IOTests.class, SmallTests.class})
|
||||||
public class TestBucketWriterThread {
|
public class TestBucketWriterThread {
|
||||||
public static final int MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE = 1000000;
|
|
||||||
private BucketCache bc;
|
private BucketCache bc;
|
||||||
private BucketCache.WriterThread wt;
|
private BucketCache.WriterThread wt;
|
||||||
private BlockingQueue<RAMQueueEntry> q;
|
private BlockingQueue<RAMQueueEntry> q;
|
||||||
private Cacheable plainCacheable;
|
private Cacheable plainCacheable;
|
||||||
private BlockCacheKey plainKey;
|
private BlockCacheKey plainKey;
|
||||||
|
|
||||||
|
/** A BucketCache that does not start its writer threads. */
|
||||||
|
private static class MockBucketCache extends BucketCache {
|
||||||
|
|
||||||
|
public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||||
|
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
|
||||||
|
throws FileNotFoundException, IOException {
|
||||||
|
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||||
|
persistencePath, ioErrorsTolerationDuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void startWriterThreads() {
|
||||||
|
// intentional noop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
||||||
* control the running of WriterThread and BucketCache is empty.
|
* control the running of WriterThread and BucketCache is empty.
|
||||||
|
@ -65,36 +78,20 @@ public class TestBucketWriterThread {
|
||||||
// Run with one writer thread only. Means there will be one writer queue only too. We depend
|
// Run with one writer thread only. Means there will be one writer queue only too. We depend
|
||||||
// on this in below.
|
// on this in below.
|
||||||
final int writerThreadsCount = 1;
|
final int writerThreadsCount = 1;
|
||||||
this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
|
this.bc = new MockBucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
|
||||||
capacity, null, 100/*Tolerate ioerrors for 100ms*/);
|
capacity, null, 100/*Tolerate ioerrors for 100ms*/);
|
||||||
assertEquals(writerThreadsCount, bc.writerThreads.length);
|
assertEquals(writerThreadsCount, bc.writerThreads.length);
|
||||||
assertEquals(writerThreadsCount, bc.writerQueues.size());
|
assertEquals(writerThreadsCount, bc.writerQueues.size());
|
||||||
// Get reference to our single WriterThread instance.
|
// Get reference to our single WriterThread instance.
|
||||||
this.wt = bc.writerThreads[0];
|
this.wt = bc.writerThreads[0];
|
||||||
this.q = bc.writerQueues.get(0);
|
this.q = bc.writerQueues.get(0);
|
||||||
// On construction bucketcache WriterThread is blocked on the writer queue so it will not
|
|
||||||
// notice the disabling of the writer until after it has processed an entry. Lets pass one
|
|
||||||
// through after setting disable flag on the writer. We want to disable the WriterThread so
|
|
||||||
// we can run the doDrain manually so we can watch it working and assert it doing right thing.
|
|
||||||
for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) {
|
|
||||||
if (wt.getThread().getState() == State.RUNNABLE) {
|
|
||||||
Thread.sleep(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertThat(wt.getThread().getState(), is(not(State.RUNNABLE)));
|
|
||||||
|
|
||||||
wt.disableWriter();
|
wt.disableWriter();
|
||||||
this.plainKey = new BlockCacheKey("f", 0);
|
this.plainKey = new BlockCacheKey("f", 0);
|
||||||
this.plainCacheable = Mockito.mock(Cacheable.class);
|
this.plainCacheable = Mockito.mock(Cacheable.class);
|
||||||
bc.cacheBlock(this.plainKey, plainCacheable);
|
|
||||||
for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) {
|
|
||||||
if (!bc.ramCache.isEmpty()) {
|
|
||||||
Thread.sleep(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertThat(bc.ramCache.isEmpty(), is(true));
|
assertThat(bc.ramCache.isEmpty(), is(true));
|
||||||
assertTrue(q.isEmpty());
|
assertTrue(q.isEmpty());
|
||||||
// Now writer thread should be disabled.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -117,7 +114,7 @@ public class TestBucketWriterThread {
|
||||||
/**
|
/**
|
||||||
* Pass through a too big entry and ensure it is cleared from queues and ramCache.
|
* Pass through a too big entry and ensure it is cleared from queues and ramCache.
|
||||||
* Manually run the WriterThread.
|
* Manually run the WriterThread.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testTooBigEntry() throws InterruptedException {
|
public void testTooBigEntry() throws InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue