From 13787ae03ccd408a24fba67be14b5c929874214a Mon Sep 17 00:00:00 2001 From: Rushabh Shah Date: Wed, 24 Nov 2021 02:48:26 -0500 Subject: [PATCH] HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871) Signed-off-by: Duo Zhang --- .../hbase/namequeues/NamedQueueRecorder.java | 54 +++++++++++++++---- .../hbase/regionserver/HRegionServer.java | 8 +++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java index 9101ce640ea..c9d1729ba39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java @@ -23,15 +23,22 @@ import com.google.common.base.Preconditions; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.util.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * NamedQueue recorder that maintains various named queues. @@ -41,14 +48,16 @@ import org.apache.hadoop.hbase.util.Threads; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class NamedQueueRecorder { - +public final class NamedQueueRecorder implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(NamedQueueRecorder.class); private final Disruptor disruptor; private final LogEventHandler logEventHandler; + private final ExecutorService executorService; private static NamedQueueRecorder namedQueueRecorder; private static boolean isInit = false; private static final Object LOCK = new Object(); + private volatile boolean closed = false; /** * Initialize disruptor with configurable ringbuffer size @@ -61,10 +70,10 @@ public final class NamedQueueRecorder { int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); + this.executorService = Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory( + hostingThreadName + ".slowlog.append-pool")); // disruptor initialization with BlockingWaitStrategy - this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), Executors. - newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName - + ".slowlog.append-pool")), + this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), executorService, ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.handleExceptionsWith(new DisruptorExceptionHandler()); @@ -142,12 +151,14 @@ public final class NamedQueueRecorder { * service */ public void addRecord(NamedQueuePayload namedQueuePayload) { - RingBuffer ringBuffer = this.disruptor.getRingBuffer(); - long seqId = ringBuffer.next(); - try { - ringBuffer.get(seqId).load(namedQueuePayload); - } finally { - ringBuffer.publish(seqId); + if (!closed) { + RingBuffer ringBuffer = this.disruptor.getRingBuffer(); + long seqId = ringBuffer.next(); + try { + ringBuffer.get(seqId).load(namedQueuePayload); + } finally { + ringBuffer.publish(seqId); + } } } @@ -161,4 +172,25 @@ public final class NamedQueueRecorder { } } + @Override + public void close() throws IOException { + // Setting closed flag to true so that we don't add more events to RingBuffer. + this.closed = true; + LOG.info("Closing NamedQueueRecorder"); + if (this.disruptor != null) { + long timeoutms = 5000; + try { + this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out bringing down disruptor after " + timeoutms + " ms; forcing halt", e); + this.disruptor.halt(); + this.disruptor.shutdown(); + } + } + // With disruptor down, this is safe to let go. + if (this.executorService != null) { + // This will close the executor threads. + this.executorService.shutdownNow(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a5500a1f542..0a423d62610 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1231,6 +1231,14 @@ public class HRegionServer extends HasThread implements stopServiceThreads(); } + try { + if (this.namedQueueRecorder != null) { + namedQueueRecorder.close(); + } + } catch (IOException ioe) { + LOG.warn("Attempt to close NamedQueueRecorder failed", ioe); + } + if (this.rpcServices != null) { this.rpcServices.stop(); }