HBASE-27855 Support dynamic adjustment of flusher count (#5247)

Co-authored-by: huiruan <876107431@qq.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Ruanhui 2023-06-06 22:00:50 +08:00 committed by GitHub
parent 8c839068e8
commit 40976b0969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 34 deletions

View File

@ -2074,6 +2074,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
}
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
@ -2454,7 +2455,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
bootstrapNodeManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.join();
this.cacheFlusher.shutdown();
}
if (this.walRoller != null) {
this.walRoller.close();

View File

@ -22,6 +22,7 @@ import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@ -36,12 +37,14 @@ import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -63,7 +66,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* @see FlushRequester
*/
@InterfaceAudience.Private
public class MemStoreFlusher implements FlushRequester {
public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
private Configuration conf;
@ -81,7 +84,12 @@ public class MemStoreFlusher implements FlushRequester {
private long blockingWaitTime;
private final LongAdder updatesBlockedMsHighWater = new LongAdder();
private final FlushHandler[] flushHandlers;
private FlushHandler[] flushHandlers;
private final AtomicInteger flusherIdGen = new AtomicInteger();
private ThreadFactory flusherThreadFactory;
private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
/**
@ -117,14 +125,9 @@ public class MemStoreFlusher implements FlushRequester {
this.server = server;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
int handlerCount = 0;
if (server != null) {
if (handlerCount < 1) {
LOG.warn(
"hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
handlerCount);
handlerCount = 1;
}
handlerCount = getHandlerCount(conf);
LOG.info("globalMemStoreLimit="
+ TraditionalBinaryPrefix
.long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
@ -305,13 +308,15 @@ public class MemStoreFlusher implements FlushRequester {
private class FlushHandler extends Thread {
private final AtomicBoolean running = new AtomicBoolean(true);
private FlushHandler(String name) {
super(name);
}
@Override
public void run() {
while (!server.isStopped()) {
while (!server.isStopped() && running.get()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
@ -356,15 +361,24 @@ public class MemStoreFlusher implements FlushRequester {
}
}
}
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}
// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
if (server.isStopped()) {
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}
// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
}
LOG.info(getName() + " exiting");
}
public void shutdown() {
if (!running.compareAndSet(true, false)) {
LOG.warn("{} is already signaled to shutdown", getName());
}
}
}
private void wakeupFlushThread() {
@ -497,8 +511,10 @@ public class MemStoreFlusher implements FlushRequester {
void interruptIfNecessary() {
lock.writeLock().lock();
try {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null) flushHander.interrupt();
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null) {
flushHandler.interrupt();
}
}
} finally {
lock.writeLock().unlock();
@ -506,30 +522,40 @@ public class MemStoreFlusher implements FlushRequester {
}
synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(eh).build();
for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
this.flusherThreadFactory =
new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
lock.readLock().lock();
try {
startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
} finally {
lock.readLock().unlock();
}
}
boolean isAlive() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null && flushHander.isAlive()) {
return true;
lock.readLock().lock();
try {
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null && flushHandler.isAlive()) {
return true;
}
}
return false;
} finally {
lock.readLock().unlock();
}
return false;
}
void join() {
for (FlushHandler flushHander : flushHandlers) {
if (flushHander != null) {
Threads.shutdown(flushHander);
void shutdown() {
lock.readLock().lock();
try {
for (FlushHandler flushHandler : flushHandlers) {
if (flushHandler != null) {
Threads.shutdown(flushHandler);
}
}
} finally {
lock.readLock().unlock();
}
}
@ -924,4 +950,60 @@ public class MemStoreFlusher implements FlushRequester {
return compareTo(other) == 0;
}
}
private int getHandlerCount(Configuration conf) {
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
if (handlerCount < 1) {
LOG.warn(
"hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
handlerCount);
handlerCount = 1;
}
return handlerCount;
}
@Override
public void onConfigurationChange(Configuration newConf) {
int newHandlerCount = getHandlerCount(newConf);
if (newHandlerCount != flushHandlers.length) {
LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
newHandlerCount);
lock.writeLock().lock();
try {
FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
if (newHandlerCount > flushHandlers.length) {
startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
} else {
stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
}
flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
this.flushHandlers = newFlushHandlers;
} finally {
lock.writeLock().unlock();
}
}
}
private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
if (flusherThreadFactory != null) {
for (int i = start; i < end; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
}
}
}
private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
for (int i = start; i < end; i++) {
flushHandlers[i].shutdown();
if (LOG.isDebugEnabled()) {
LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
}
}
}
public int getFlusherCount() {
return flusherIdGen.get();
}
}

View File

@ -94,4 +94,26 @@ public class TestMemStoreFlusher {
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
}
@Test
public void testChangeFlusherCount() {
Configuration conf = new Configuration();
conf.set("hbase.hstore.flusher.count", "0");
HRegionServer rs = mock(HRegionServer.class);
doReturn(false).when(rs).isStopped();
doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();
msf = new MemStoreFlusher(conf, rs);
msf.start(Threads.LOGGING_EXCEPTION_HANDLER);
Configuration newConf = new Configuration();
newConf.set("hbase.hstore.flusher.count", "3");
msf.onConfigurationChange(newConf);
assertEquals(3, msf.getFlusherCount());
newConf.set("hbase.hstore.flusher.count", "0");
msf.onConfigurationChange(newConf);
assertEquals(1, msf.getFlusherCount());
}
}