diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 1c88eb03b9c..d954eefa81f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; @@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.StealJobQueue; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; @@ -117,8 +117,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi final String n = Thread.currentThread().getName(); + StealJobQueue stealJobQueue = new StealJobQueue<>(); this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), + 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -128,8 +129,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } }); this.longCompactions.setRejectedExecutionHandler(new Rejection()); + this.longCompactions.prestartAllCoreThreads(); this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), + 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -500,7 +502,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi assert this.compaction.hasSelection(); ThreadPoolExecutor pool = store.throttleCompaction( compaction.getRequest().getSize()) ? longCompactions : shortCompactions; - if (this.parent != pool) { + + // Long compaction pool can process small job + // Short compaction pool should not process large job + if (this.parent == shortCompactions && pool == longCompactions) { this.store.cancelRequestedCompaction(this.compaction); this.compaction = null; this.parent = pool; @@ -673,4 +678,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return compactionThroughputController; } + @VisibleForTesting + /** + * Shutdown the long compaction thread pool. + * Should only be used in unit test to prevent long compaction thread pool from stealing job + * from short compaction queue + */ + void shutdownLongCompactions(){ + this.longCompactions.shutdown(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java new file mode 100644 index 00000000000..74f07477b90 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor. + * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the + * steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal + * PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there + * are jobs in the steal-from queue if this q ueue is empty. + * + * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the + * other queue, otherwise the worker will only be started after there are jobs submitted to main + * queue. + */ +@InterfaceAudience.Private +public class StealJobQueue extends PriorityBlockingQueue { + + private BlockingQueue stealFromQueue; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + + public StealJobQueue() { + this.stealFromQueue = new PriorityBlockingQueue() { + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + }; + } + + public BlockingQueue getStealFromQueue() { + return stealFromQueue; + } + + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + + + @Override + public T take() throws InterruptedException { + lock.lockInterruptibly(); + try { + while (true) { + T retVal = this.poll(); + if (retVal == null) { + retVal = stealFromQueue.poll(); + } + if (retVal == null) { + notEmpty.await(); + } else { + return retVal; + } + } + } finally { + lock.unlock(); + } + } + + @Override + public T poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (true) { + T retVal = this.poll(); + if (retVal == null) { + retVal = stealFromQueue.poll(); + } + if (retVal == null) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } else { + return retVal; + } + } + } finally { + lock.unlock(); + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 9fa854b78b7..fcc9fc32a19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -494,7 +494,8 @@ public class TestCompaction { when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); CompactSplitThread cst = new CompactSplitThread(mockServer); when(mockServer.getCompactSplitThread()).thenReturn(cst); - + //prevent large compaction thread pool stealing job from small compaction queue. + cst.shutdownLongCompactions(); // Set up the region mock that redirects compactions. HRegion r = mock(HRegion.class); when( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java new file mode 100644 index 00000000000..b35f6f4772b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java @@ -0,0 +1,229 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.StealJobQueue; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + + +@Category({MiscTests.class, SmallTests.class}) +public class TestStealJobQueue { + + StealJobQueue stealJobQueue; + BlockingQueue stealFromQueue; + + @Before + public void setup() { + stealJobQueue = new StealJobQueue<>(); + stealFromQueue = stealJobQueue.getStealFromQueue(); + + } + + + @Test + public void testTake() throws InterruptedException { + stealJobQueue.offer(3); + stealFromQueue.offer(10); + stealJobQueue.offer(15); + stealJobQueue.offer(4); + assertEquals(3, stealJobQueue.take().intValue()); + assertEquals(4, stealJobQueue.take().intValue()); + assertEquals("always take from the main queue before trying to steal", 15, + stealJobQueue.take().intValue()); + assertEquals(10, stealJobQueue.take().intValue()); + assertTrue(stealFromQueue.isEmpty()); + assertTrue(stealJobQueue.isEmpty()); + } + + @Test + public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.take(); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealFromQueue.offer(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.take(); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealJobQueue.offer(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testPoll() throws InterruptedException { + stealJobQueue.offer(3); + stealFromQueue.offer(10); + stealJobQueue.offer(15); + stealJobQueue.offer(4); + assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertEquals("always take from the main queue before trying to steal", 15, + stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertTrue(stealFromQueue.isEmpty()); + assertTrue(stealJobQueue.isEmpty()); + assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS)); + } + + @Test + public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealFromQueue.put(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + + } + + + @Test + public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealJobQueue.add(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testInteractWithThreadPool() throws InterruptedException { + StealJobQueue stealTasksQueue = new StealJobQueue<>(); + final CountDownLatch stealJobCountDown = new CountDownLatch(3); + final CountDownLatch stealFromCountDown = new CountDownLatch(3); + ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + stealJobCountDown.countDown(); + } + + }; + + //This is necessary otherwise no worker will be running and stealing job + stealPool.prestartAllCoreThreads(); + + ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, + stealTasksQueue.getStealFromQueue()) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + stealFromCountDown.countDown(); + } + }; + + for (int i = 0; i < 4; i++) { + TestTask task = new TestTask(); + stealFromPool.execute(task); + } + + for (int i = 0; i < 2; i++) { + TestTask task = new TestTask(); + stealPool.execute(task); + } + + stealJobCountDown.await(1, TimeUnit.SECONDS); + stealFromCountDown.await(1, TimeUnit.SECONDS); + assertEquals(0, stealFromCountDown.getCount()); + assertEquals(0, stealJobCountDown.getCount()); + } + + class TestTask extends Thread implements Comparable { + @Override + public int compareTo(TestTask o) { + return 0; + } + + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +} \ No newline at end of file