HBASE-11798 TestBucketWriterThread may hang due to WriterThread stopping prematurely (Sergey Soldatov and Alex Newman)
This commit is contained in:
parent
6df18a1b65
commit
3b57f28dad
|
@ -18,8 +18,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -28,20 +36,16 @@ import java.util.List;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import static java.lang.Thread.State;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestBucketWriterThread {
|
||||
public static final int MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE = 1000000;
|
||||
private BucketCache bc;
|
||||
private BucketCache.WriterThread wt;
|
||||
private BlockingQueue<RAMQueueEntry> q;
|
||||
|
@ -71,11 +75,23 @@ public class TestBucketWriterThread {
|
|||
// notice the disabling of the writer until after it has processed an entry. Lets pass one
|
||||
// through after setting disable flag on the writer. We want to disable the WriterThread so
|
||||
// we can run the doDrain manually so we can watch it working and assert it doing right thing.
|
||||
for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) {
|
||||
if (wt.getThread().getState() == State.RUNNABLE) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
}
|
||||
assertThat(wt.getThread().getState(), is(not(State.RUNNABLE)));
|
||||
|
||||
wt.disableWriter();
|
||||
this.plainKey = new BlockCacheKey("f", 0);
|
||||
this.plainCacheable = Mockito.mock(Cacheable.class);
|
||||
bc.cacheBlock(this.plainKey, plainCacheable);
|
||||
while(!bc.ramCache.isEmpty()) Threads.sleep(1);
|
||||
for (int i = 0; i != MAX_NUMBER_OF_TRIES_BEFORE_TEST_FAILURE; i++) {
|
||||
if (!bc.ramCache.isEmpty()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
}
|
||||
assertThat(bc.ramCache.isEmpty(), is(true));
|
||||
assertTrue(q.isEmpty());
|
||||
// Now writer thread should be disabled.
|
||||
}
|
||||
|
@ -92,8 +108,7 @@ public class TestBucketWriterThread {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testNonErrorCase()
|
||||
throws FileNotFoundException, IOException, InterruptedException {
|
||||
public void testNonErrorCase() throws IOException, InterruptedException {
|
||||
bc.cacheBlock(this.plainKey, this.plainCacheable);
|
||||
doDrainOfOneEntry(this.bc, this.wt, this.q);
|
||||
}
|
||||
|
@ -114,15 +129,12 @@ public class TestBucketWriterThread {
|
|||
/**
|
||||
* Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
|
||||
* put it back and process it.
|
||||
* @throws IOException
|
||||
* @throws BucketAllocatorException
|
||||
* @throws CacheFullException
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test (timeout=30000)
|
||||
public void testIOE()
|
||||
throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
|
||||
public void testIOE() throws IOException, InterruptedException {
|
||||
this.bc.cacheBlock(this.plainKey, plainCacheable);
|
||||
RAMQueueEntry rqe = q.remove();
|
||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||
|
@ -137,14 +149,12 @@ public class TestBucketWriterThread {
|
|||
|
||||
/**
|
||||
* Do Cache full exception
|
||||
* @throws IOException
|
||||
* @throws BucketAllocatorException
|
||||
* @throws CacheFullException
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testCacheFullException()
|
||||
throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
this.bc.cacheBlock(this.plainKey, plainCacheable);
|
||||
RAMQueueEntry rqe = q.remove();
|
||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||
|
@ -167,4 +177,4 @@ public class TestBucketWriterThread {
|
|||
assertTrue(bc.ramCache.isEmpty());
|
||||
assertEquals(0, bc.heapSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue