From cfc5348b5eb60daad812e362c90c74341f10d3e6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 4 Nov 2015 07:20:59 -0800 Subject: [PATCH] HBASE-14758 Add UT case for unchecked error/exception thrown in AsyncProcess#sendMultiAction (Yu Li) --- .../hadoop/hbase/client/TestAsyncProcess.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 28cff104e2f..067f2ad1e15 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -29,7 +29,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -1049,4 +1052,41 @@ public class TestAsyncProcess { return p; } + + static class MyThreadPoolExecutor extends ThreadPoolExecutor { + public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, + TimeUnit timeunit, BlockingQueue blockingqueue) { + super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); + } + + @Override + public Future submit(Runnable runnable) { + throw new OutOfMemoryError("OutOfMemory error thrown by means"); + } + } + + static class AsyncProcessForThrowableCheck extends AsyncProcess { + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, + ExecutorService pool) { + super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( + conf)); + } + } + + @Test + public void testUncheckedException() throws Exception { + // Test the case pool.submit throws unchecked exception + ClusterConnection hc = createHConnection(); + MyThreadPoolExecutor myPool = + new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(200)); + AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool); + + List puts = new ArrayList(); + puts.add(createPut(1, true)); + + ap.submit(DUMMY_TABLE, puts, false, null, false); + Assert.assertTrue(puts.isEmpty()); + } + }