From 0e147a9d6e53e71ad2e57f512b4d3e1eeeac0b78 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 9 Dec 2015 07:18:08 -0800 Subject: [PATCH] HBASE-14954 IllegalArgumentException was thrown when doing online configuration change in CompactSplitThread (Victor Xu) --- .../regionserver/CompactSplitThread.java | 46 ++++++-- .../regionserver/TestCompactSplitThread.java | 104 ++++++++++++++++++ 2 files changed, 141 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 6ce90bcff0b..f54f008d1d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -618,8 +618,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " + this.longCompactions.getCorePoolSize() + " to " + largeThreads); - this.longCompactions.setMaximumPoolSize(largeThreads); - this.longCompactions.setCorePoolSize(largeThreads); + if(this.longCompactions.getCorePoolSize() < largeThreads) { + this.longCompactions.setMaximumPoolSize(largeThreads); + this.longCompactions.setCorePoolSize(largeThreads); + } else { + this.longCompactions.setCorePoolSize(largeThreads); + this.longCompactions.setMaximumPoolSize(largeThreads); + } } int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, @@ -628,8 +633,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); - this.shortCompactions.setMaximumPoolSize(smallThreads); - this.shortCompactions.setCorePoolSize(smallThreads); + if(this.shortCompactions.getCorePoolSize() < smallThreads) { + this.shortCompactions.setMaximumPoolSize(smallThreads); + this.shortCompactions.setCorePoolSize(smallThreads); + } else { + this.shortCompactions.setCorePoolSize(smallThreads); + this.shortCompactions.setMaximumPoolSize(smallThreads); + } } int splitThreads = newConf.getInt(SPLIT_THREADS, @@ -638,8 +648,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() + " to " + splitThreads); - this.splits.setMaximumPoolSize(smallThreads); - this.splits.setCorePoolSize(smallThreads); + if(this.splits.getCorePoolSize() < splitThreads) { + this.splits.setMaximumPoolSize(splitThreads); + this.splits.setCorePoolSize(splitThreads); + } else { + this.splits.setCorePoolSize(splitThreads); + this.splits.setMaximumPoolSize(splitThreads); + } } int mergeThreads = newConf.getInt(MERGE_THREADS, @@ -648,8 +663,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi LOG.info("Changing the value of " + MERGE_THREADS + " from " + this.mergePool.getCorePoolSize() + " to " + mergeThreads); - this.mergePool.setMaximumPoolSize(smallThreads); - this.mergePool.setCorePoolSize(smallThreads); + if(this.mergePool.getCorePoolSize() < mergeThreads) { + this.mergePool.setMaximumPoolSize(mergeThreads); + this.mergePool.setCorePoolSize(mergeThreads); + } else { + this.mergePool.setCorePoolSize(mergeThreads); + this.mergePool.setMaximumPoolSize(mergeThreads); + } } CompactionThroughputController old = this.compactionThroughputController; @@ -668,10 +688,18 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return this.shortCompactions.getCorePoolSize(); } - public int getLargeCompactionThreadNum() { + protected int getLargeCompactionThreadNum() { return this.longCompactions.getCorePoolSize(); } + protected int getSplitThreadNum() { + return this.splits.getCorePoolSize(); + } + + protected int getMergeThreadNum() { + return this.mergePool.getCorePoolSize(); + } + /** * {@inheritDoc} */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java new file mode 100644 index 00000000000..022279a7a21 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category(MediumTests.class) +public class TestCompactSplitThread { + private static final Log LOG = LogFactory.getLog(TestCompactSplitThread.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + private final byte[] family = Bytes.toBytes("f"); + + @Test + public void testThreadPoolSizeTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3); + conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4); + conf.setInt(CompactSplitThread.SPLIT_THREADS, 5); + conf.setInt(CompactSplitThread.MERGE_THREADS, 6); + TEST_UTIL.startMiniCluster(1); + Connection conn = ConnectionFactory.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + + // check initial configuration of thread pool sizes + assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum()); + assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum()); + assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum()); + + // change bigger configurations and do online update + conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4); + conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5); + conf.setInt(CompactSplitThread.SPLIT_THREADS, 6); + conf.setInt(CompactSplitThread.MERGE_THREADS, 7); + try { + regionServer.compactSplitThread.onConfigurationChange(conf); + } catch (IllegalArgumentException iae) { + Assert.fail("Update bigger configuration failed!"); + } + + // check again after online update + assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum()); + assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum()); + assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum()); + + // change smaller configurations and do online update + conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2); + conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3); + conf.setInt(CompactSplitThread.SPLIT_THREADS, 4); + conf.setInt(CompactSplitThread.MERGE_THREADS, 5); + try { + regionServer.compactSplitThread.onConfigurationChange(conf); + } catch (IllegalArgumentException iae) { + Assert.fail("Update smaller configuration failed!"); + } + + // check again after online update + assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum()); + assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum()); + assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum()); + assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum()); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +}