diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e63471285df..75d0630966b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -256,8 +256,9 @@ public class BucketCache implements BlockCache, HeapSize { writerThreads[i] = new WriterThread(writerQueues.get(i), i); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); writerThreads[i].setDaemon(true); - writerThreads[i].start(); } + startWriterThreads(); + // Run the statistics thread periodically to print the cache statistics log // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log // every five minutes. @@ -270,6 +271,17 @@ public class BucketCache implements BlockCache, HeapSize { persistencePath + ", bucketAllocator=" + this.bucketAllocator); } + /** + * Called by the constructor to start the writer threads. Used by tests that need to override + * starting the threads. + */ + @VisibleForTesting + protected void startWriterThreads() { + for (WriterThread thread : writerThreads) { + thread.start(); + } + } + @VisibleForTesting boolean isCacheEnabled() { return this.cacheEnabled; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 8a6296aa6a5..91f453f6a7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -36,22 +36,35 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; -import static java.lang.Thread.State; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @Category(SmallTests.class) public class TestBucketWriterThread { - public static final int MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE = 1000000; private BucketCache bc; private BucketCache.WriterThread wt; private BlockingQueue q; private Cacheable plainCacheable; private BlockCacheKey plainKey; + /** A BucketCache that does not start its writer threads. */ + private static class MockBucketCache extends BucketCache { + + public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) + throws FileNotFoundException, IOException { + super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath, ioErrorsTolerationDuration); + } + + @Override + protected void startWriterThreads() { + // intentional noop + } + } + /** * Set up variables and get BucketCache and WriterThread into state where tests can manually * control the running of WriterThread and BucketCache is empty. @@ -64,36 +77,20 @@ public class TestBucketWriterThread { // Run with one writer thread only. Means there will be one writer queue only too. We depend // on this in below. final int writerThreadsCount = 1; - this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount, + this.bc = new MockBucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount, capacity, null, 100/*Tolerate ioerrors for 100ms*/); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. this.wt = bc.writerThreads[0]; this.q = bc.writerQueues.get(0); - // On construction bucketcache WriterThread is blocked on the writer queue so it will not - // notice the disabling of the writer until after it has processed an entry. Lets pass one - // through after setting disable flag on the writer. We want to disable the WriterThread so - // we can run the doDrain manually so we can watch it working and assert it doing right thing. - for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) { - if (wt.getThread().getState() == State.RUNNABLE) { - Thread.sleep(1); - } - } - assertThat(wt.getThread().getState(), is(not(State.RUNNABLE))); wt.disableWriter(); this.plainKey = new BlockCacheKey("f", 0); this.plainCacheable = Mockito.mock(Cacheable.class); - bc.cacheBlock(this.plainKey, plainCacheable); - for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) { - if (!bc.ramCache.isEmpty()) { - Thread.sleep(1); - } - } + assertThat(bc.ramCache.isEmpty(), is(true)); assertTrue(q.isEmpty()); - // Now writer thread should be disabled. } @After @@ -116,7 +113,7 @@ public class TestBucketWriterThread { /** * Pass through a too big entry and ensure it is cleared from queues and ramCache. * Manually run the WriterThread. - * @throws InterruptedException + * @throws InterruptedException */ @Test public void testTooBigEntry() throws InterruptedException {