HBASE-14922 Delayed flush doesn't work causing flush storms.

This commit is contained in:
Elliott Clark 2015-12-03 10:40:40 -08:00
parent c6b8e6f1ac
commit cd5ddc5fec
4 changed files with 157 additions and 13 deletions

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
@ -87,11 +88,21 @@ public class ChoreService implements ChoreServicer {
private final String coreThreadPoolPrefix;
/**
*
* @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
* spawned by this service
*/
@VisibleForTesting
public ChoreService(final String coreThreadPoolPrefix) {
this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
}
/**
* @param jitter Should chore service add some jitter for all of the scheduled chores. When set
* to true this will add -10% to 10% jitter.
*/
public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
}
/**
@ -101,11 +112,19 @@ public class ChoreService implements ChoreServicer {
* to during initialization. The default size is 1, but specifying a larger size may be
* beneficial if you know that 1 thread will not be enough.
*/
public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
this.coreThreadPoolPrefix = coreThreadPoolPrefix;
if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
if (corePoolSize < MIN_CORE_POOL_SIZE) {
corePoolSize = MIN_CORE_POOL_SIZE;
}
final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
if (jitter) {
scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
} else {
scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
scheduler.setRemoveOnCancelPolicy(true);
scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
@ -127,7 +146,9 @@ public class ChoreService implements ChoreServicer {
* (typically occurs when a chore is scheduled during shutdown of service)
*/
public synchronized boolean scheduleChore(ScheduledChore chore) {
if (chore == null) return false;
if (chore == null) {
return false;
}
try {
chore.setChoreServicer(this);

View File

@ -0,0 +1,123 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
*
* This will spread out things on a distributed cluster.
*/
public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
private final double spread;
/**
* Main constructor.
* @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
*/
public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
ThreadFactory threadFactory,
double spread) {
super(corePoolSize, threadFactory);
this.spread = spread;
}
protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) {
return new JitteredRunnableScheduledFuture<>(task);
}
protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) {
return new JitteredRunnableScheduledFuture<>(task);
}
/**
* Class that basically just defers to the wrapped future.
* The only exception is getDelay
*/
protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> wrapped;
JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
this.wrapped = wrapped;
}
@Override
public boolean isPeriodic() {
return wrapped.isPeriodic();
}
@Override
public long getDelay(TimeUnit unit) {
long baseDelay = wrapped.getDelay(unit);
long spreadTime = (long) (baseDelay * spread);
long delay = baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
// Ensure that we don't roll over for nanoseconds.
return (delay < 0) ? baseDelay : delay;
}
@Override
public int compareTo(Delayed o) {
return wrapped.compareTo(o);
}
@Override
public void run() {
wrapped.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return wrapped.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return wrapped.isCancelled();
}
@Override
public boolean isDone() {
return wrapped.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return wrapped.get();
}
@Override
public V get(long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return wrapped.get(timeout, unit);
}
}
}

View File

@ -315,7 +315,7 @@ public class TestChoreService {
final int corePoolSize = 10;
final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize);
ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
try {
assertEquals(corePoolSize, customInit.getCorePoolSize());
} finally {
@ -329,11 +329,11 @@ public class TestChoreService {
shutdownService(defaultInit);
}
ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10);
ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false);
try {
assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
} finally {
shutdownService(invalidInit);
shutdownService(invalidInit);
}
}
@ -403,7 +403,7 @@ public class TestChoreService {
@Test (timeout=20000)
public void testCorePoolIncrease() throws InterruptedException {
final int initialCorePoolSize = 3;
ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize);
ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false);
try {
assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize,
@ -443,7 +443,7 @@ public class TestChoreService {
@Test(timeout = 30000)
public void testCorePoolDecrease() throws InterruptedException {
final int initialCorePoolSize = 3;
ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize);
ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false);
final int chorePeriod = 100;
try {
// Slow chores always miss their start time and thus the core pool size should be at least as

View File

@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
this.choreService = new ChoreService(getServerName().toString());
this.choreService = new ChoreService(getServerName().toString(), true);
if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), new SignalHandler() {
@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements
static class PeriodicMemstoreFlusher extends ScheduledChore {
final HRegionServer server;
final static int RANGE_OF_DELAY = 20000; //millisec
final static int MIN_DELAY_TIME = 3000; //millisec
final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
final static int MIN_DELAY_TIME = 0; // millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;