From 1cf2511b30a8d90e9ae55396813a3b068e4468e4 Mon Sep 17 00:00:00 2001 From: sershe Date: Tue, 9 Jul 2013 19:49:54 +0000 Subject: [PATCH] HBASE-8329 Limit compaction speed (binlijin) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1501496 13f79535-47bb-0310-9956-ffa450edef68 --- .../regionserver/compactions/Compactor.java | 6 + .../compactions/OffPeakHours.java | 7 + .../compactions/PeakCompactionsThrottle.java | 138 ++++++++++++++++++ .../TestPeakCompactionsThrottle.java | 70 +++++++++ 4 files changed, 221 insertions(+) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index ddbcb89e61b..8cbe05bf430 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -58,6 +58,7 @@ public abstract class Compactor { private int compactionKVMax; protected Compression.Algorithm compactionCompression; + private PeakCompactionsThrottle peakCompactionsThrottle; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -66,6 +67,7 @@ public abstract class Compactor { this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10); this.compactionCompression = (this.store.getFamily() == null) ? Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression(); + this.peakCompactionsThrottle = new PeakCompactionsThrottle(conf); } /** @@ -201,6 +203,7 @@ public abstract class Compactor { // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME int closeCheckInterval = HStore.getCloseCheckInterval(); boolean hasMore; + peakCompactionsThrottle.startCompaction(); do { hasMore = scanner.next(kvs, compactionKVMax); // output to writer: @@ -222,9 +225,12 @@ public abstract class Compactor { } } } + peakCompactionsThrottle.throttle(kv.getLength()); } kvs.clear(); } while (hasMore); + peakCompactionsThrottle.finishCompaction(this.store.getRegionInfo() + .getRegionNameAsString(), this.store.getFamily().getNameAsString()); progress.complete(); return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java index e4cf97c6a9a..05518a6b2e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java @@ -37,6 +37,13 @@ public abstract class OffPeakHours { return getInstance(startHour, endHour); } + public static OffPeakHours getInstance(Configuration conf, String start, + String end) { + int startHour = conf.getInt(start, -1); + int endHour = conf.getInt(end, -1); + return getInstance(startHour, endHour); + } + /** * @param startHour inclusive * @param endHour exclusive diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java new file mode 100644 index 00000000000..0aa93610539 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.io.InterruptedIOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; + +/** + * The class used to track peak hours and compactions. peak compaction speed + * should be limit. + * + */ +@InterfaceAudience.Private +public class PeakCompactionsThrottle { + private static final Log LOG = LogFactory.getLog(PeakCompactionsThrottle.class); + + public static final String PEAK_START_HOUR = "hbase.peak.start.hour"; + public static final String PEAK_END_HOUR = "hbase.peak.end.hour"; + public static final String PEAK_COMPACTION_SPEED_ALLOWED = + "hbase.regionserver.compaction.peak.maxspeed"; + public static final String PEAK_COMPACTION_SPEED_CHECK_INTERVAL = + "hbase.regionserver.compaction.speed.check.interval"; + + OffPeakHours offPeakHours; + private long start; + private long end; + private long maxSpeedInPeak; + private int sleepNumber = 0; + private int sleepTimeTotal = 0; + int bytesWritten = 0; + int checkInterval = 0; + + public PeakCompactionsThrottle(Configuration conf) { + offPeakHours = OffPeakHours.getInstance(conf, PEAK_START_HOUR, PEAK_END_HOUR); + maxSpeedInPeak = conf.getInt(PEAK_COMPACTION_SPEED_ALLOWED, 30 * 1024 * 1024 /* 30 MB/s */); + checkInterval = conf.getInt(PEAK_COMPACTION_SPEED_CHECK_INTERVAL, 30 * 1024 * 1024 /* 30 MB */); + } + + /** + * start compaction + */ + public void startCompaction() { + start = System.currentTimeMillis(); + } + + /** + * finish compaction + */ + public void finishCompaction(String region, String family) { + if (sleepNumber > 0) { + LOG.info("Region '" + region + "' family '" + family + "' 's maxSpeedInPeak is " + + StringUtils.humanReadableInt(maxSpeedInPeak) + "/s compaction throttle: sleep number " + + sleepNumber + " sleep time " + sleepTimeTotal + "(ms)"); + } + } + + /** + * reset start time + */ + void resetStartTime() { + start = System.currentTimeMillis(); + } + + /** + * Peak compaction throttle, if it is peak time and the compaction speed is too fast, sleep for a + * while to slow down. + */ + public void throttle(long numOfBytes) throws IOException { + bytesWritten += numOfBytes; + if (bytesWritten >= checkInterval) { + checkAndSlowFastCompact(bytesWritten); + bytesWritten = 0; + } + } + + private void checkAndSlowFastCompact(long numOfBytes) throws IOException { + if (!isPeakHour()) { + // not peak hour, just return. + return; + } + if (maxSpeedInPeak <= 0) { + return; + } + end = System.currentTimeMillis(); + long minTimeAllowed = numOfBytes * 1000 / maxSpeedInPeak; // ms + long elapsed = end - start; + if (elapsed < minTimeAllowed) { + // too fast + try { + // sleep for a while to slow down. + Thread.sleep(minTimeAllowed - elapsed); + sleepNumber++; + sleepTimeTotal += (minTimeAllowed - elapsed); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + } + resetStartTime(); + } + + /** + * @return whether this is peak hour + */ + private boolean isPeakHour() { + return offPeakHours.isOffPeakHour(); + } + + /** + * For test + */ + public int getSleepNumber() { + return sleepNumber; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java new file mode 100644 index 00000000000..68cfd34940f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestPeakCompactionsThrottle { + private static HBaseTestingUtility testUtil; + private Configuration conf; + + @BeforeClass + public static void setUpClass() { + testUtil = new HBaseTestingUtility(); + } + + @Before + public void setUp() { + conf = testUtil.getConfiguration(); + } + + @Test + public void testSetPeakHourToTargetTime() throws IOException { + conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "0"); + conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "23"); + PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf); + peakCompactionsThrottle.startCompaction(); + long numOfBytes = 60 * 1024 * 1024; + peakCompactionsThrottle.throttle(numOfBytes); + peakCompactionsThrottle.finishCompaction("region", "family"); + assertTrue(peakCompactionsThrottle.getSleepNumber() > 0); + } + + @Test + public void testSetPeakHourOutsideCurrentSelection() throws IOException { + conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "-1"); + conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "-1"); + PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf); + peakCompactionsThrottle.startCompaction(); + long numOfBytes = 30 * 1024 * 1024; + peakCompactionsThrottle.throttle(numOfBytes); + peakCompactionsThrottle.finishCompaction("region", "family"); + assertTrue(peakCompactionsThrottle.getSleepNumber() == 0); + } +}