HBASE-7437 Improve CompactSelection (Hiroshi Ikeda)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1470754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-04-22 23:45:18 +00:00
parent d995e62b56
commit b4dd49f55d
6 changed files with 263 additions and 188 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -67,7 +68,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@ -147,7 +148,8 @@ public class HStore implements Store {
final StoreEngine<?, ?, ?> storeEngine;
private OffPeakCompactions offPeakCompactions;
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
private final OffPeakHours offPeakHours;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private static int flush_retries_number;
@ -199,7 +201,7 @@ public class HStore implements Store {
// to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
this.offPeakCompactions = new OffPeakCompactions(conf);
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
@ -1183,13 +1185,21 @@ public class HStore implements Store {
// Normal case - coprocessor is not overriding file selection.
if (!compaction.hasSelection()) {
boolean isUserCompaction = priority == Store.PRIORITY_USER;
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
compaction.select(this.filesCompacting, isUserCompaction,
boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true);
try {
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (IOException e) {
if (mayUseOffPeak) {
offPeakCompactionTracker.set(false);
}
throw e;
}
assert compaction.hasSelection();
if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
// Compaction policy doesn't want to take advantage of off-peak.
this.offPeakCompactions.endOffPeakRequest();
offPeakCompactionTracker.set(false);
}
}
if (this.getCoprocessorHost() != null) {
@ -1249,7 +1259,7 @@ public class HStore implements Store {
private void finishCompactionRequest(CompactionRequest cr) {
this.region.reportCompactionRequestEnd(cr.isMajor());
if (cr.isOffPeak()) {
this.offPeakCompactions.endOffPeakRequest();
offPeakCompactionTracker.set(false);
cr.setOffPeak(false);
}
synchronized (filesCompacting) {

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class CurrentHourProvider {
private CurrentHourProvider() { throw new AssertionError(); }
private static final class Tick {
final int currentHour;
final long expirationTimeInMillis;
Tick(int currentHour, long expirationTimeInMillis) {
this.currentHour = currentHour;
this.expirationTimeInMillis = expirationTimeInMillis;
}
}
private static Tick nextTick() {
Calendar calendar = new GregorianCalendar();
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
moveToNextHour(calendar);
return new Tick(currentHour, calendar.getTimeInMillis());
}
private static void moveToNextHour(Calendar calendar) {
calendar.add(Calendar.HOUR_OF_DAY, 1);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
}
private static volatile Tick tick = nextTick();
public static int getCurrentHour() {
Tick tick = CurrentHourProvider.tick;
if(System.currentTimeMillis() < tick.expirationTimeInMillis) {
return tick.currentHour;
}
CurrentHourProvider.tick = tick = nextTick();
return tick.currentHour;
}
}

View File

@ -1,111 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* The class used to track off-peak hours and compactions. Off-peak compaction counter
* is global for the entire server, hours can be different per instance of this class,
* based on the configuration of the corresponding store.
*/
@InterfaceAudience.Private
public class OffPeakCompactions {
private static final Log LOG = LogFactory.getLog(OffPeakCompactions.class);
private final static Calendar calendar = new GregorianCalendar();
private int offPeakStartHour;
private int offPeakEndHour;
// TODO: replace with AtomicLong, see HBASE-7437.
/**
* Number of off peak compactions either in the compaction queue or
* happening now. Please lock compactionCountLock before modifying.
*/
private static long numOutstanding = 0;
/**
* Lock object for numOutstandingOffPeakCompactions
*/
private static final Object compactionCountLock = new Object();
public OffPeakCompactions(Configuration conf) {
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
}
/**
* Tries making the compaction off-peak.
* @return Whether the compaction can be made off-peak.
*/
public boolean tryStartOffPeakRequest() {
if (!isOffPeakHour()) return false;
synchronized(compactionCountLock) {
if (numOutstanding == 0) {
numOutstanding++;
return true;
}
}
return false;
}
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void endOffPeakRequest() {
long newValueToLog = -1;
synchronized(compactionCountLock) {
newValueToLog = --numOutstanding;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + newValueToLog);
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
// If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
}
if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
public abstract class OffPeakHours {
private static final Log LOG = LogFactory.getLog(OffPeakHours.class);
public static final OffPeakHours DISABLED = new OffPeakHours() {
@Override public boolean isOffPeakHour() { return false; }
@Override public boolean isOffPeakHour(int targetHour) { return false; }
};
public static OffPeakHours getInstance(Configuration conf) {
int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
return getInstance(startHour, endHour);
}
/**
* @param startHour inclusive
* @param endHour exclusive
*/
public static OffPeakHours getInstance(int startHour, int endHour) {
if (startHour == -1 && endHour == -1) {
return DISABLED;
}
if (! isValidHour(startHour) || ! isValidHour(endHour)) {
if (LOG.isWarnEnabled()) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
startHour + " end = " + endHour +
". Valid numbers are [0-23]");
}
return DISABLED;
}
if (startHour == endHour) {
return DISABLED;
}
return new OffPeakHoursImpl(startHour, endHour);
}
private static boolean isValidHour(int hour) {
return 0 <= hour && hour <= 23;
}
/**
* @return whether {@code targetHour} is off-peak hour
*/
public abstract boolean isOffPeakHour(int targetHour);
/**
* @return whether it is off-peak hour
*/
public abstract boolean isOffPeakHour();
private static class OffPeakHoursImpl extends OffPeakHours {
final int startHour;
final int endHour;
/**
* @param startHour inclusive
* @param endHour exclusive
*/
OffPeakHoursImpl(int startHour, int endHour) {
this.startHour = startHour;
this.endHour = endHour;
}
@Override
public boolean isOffPeakHour() {
return isOffPeakHour(CurrentHourProvider.getCurrentHour());
}
@Override
public boolean isOffPeakHour(int targetHour) {
if (startHour <= endHour) {
return startHour <= targetHour && targetHour < endHour;
}
return targetHour < endHour || startHour <= targetHour;
}
}
}

View File

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestOffPeakCompactions {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test
public void testOffPeakHours() throws IOException {
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
Configuration conf = TEST_UTIL.getConfiguration();
OffPeakCompactions opc = new OffPeakCompactions(conf);
LOG.debug("Testing without off-peak settings...");
assertFalse(opc.tryStartOffPeakRequest());
// set peak hour to current time and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
opc = new OffPeakCompactions(conf);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
assertTrue(opc.tryStartOffPeakRequest());
opc.endOffPeakRequest();
// set peak hour outside current selection and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
opc = new OffPeakCompactions(conf);
assertFalse(opc.tryStartOffPeakRequest());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestOffPeakHours {
private static HBaseTestingUtility testUtil;
@BeforeClass
public static void setUpClass() {
testUtil = new HBaseTestingUtility();
}
private int hourOfDay;
private int hourPlusOne;
private int hourMinusOne;
private int hourMinusTwo;
private Configuration conf;
@Before
public void setUp() {
hourOfDay = 15;
hourPlusOne = ((hourOfDay+1)%24);
hourMinusOne = ((hourOfDay-1+24)%24);
hourMinusTwo = ((hourOfDay-2+24)%24);
conf = testUtil.getConfiguration();
}
@Test
public void testWithoutSettings() {
Configuration conf = testUtil.getConfiguration();
OffPeakHours target = OffPeakHours.getInstance(conf);
assertFalse(target.isOffPeakHour(hourOfDay));
}
@Test
public void testSetPeakHourToTargetTime() {
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
OffPeakHours target = OffPeakHours.getInstance(conf);
assertTrue(target.isOffPeakHour(hourOfDay));
}
@Test
public void testSetPeakHourOutsideCurrentSelection() {
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
OffPeakHours target = OffPeakHours.getInstance(conf);
assertFalse(target.isOffPeakHour(hourOfDay));
}
}