HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Rushabh Shah 2021-11-24 02:48:26 -05:00 committed by GitHub
parent 6d9da1314b
commit 13787ae03c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 11 deletions

View File

@ -23,15 +23,22 @@ import com.google.common.base.Preconditions;
import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* NamedQueue recorder that maintains various named queues. * NamedQueue recorder that maintains various named queues.
@ -41,14 +48,16 @@ import org.apache.hadoop.hbase.util.Threads;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class NamedQueueRecorder { public final class NamedQueueRecorder implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(NamedQueueRecorder.class);
private final Disruptor<RingBufferEnvelope> disruptor; private final Disruptor<RingBufferEnvelope> disruptor;
private final LogEventHandler logEventHandler; private final LogEventHandler logEventHandler;
private final ExecutorService executorService;
private static NamedQueueRecorder namedQueueRecorder; private static NamedQueueRecorder namedQueueRecorder;
private static boolean isInit = false; private static boolean isInit = false;
private static final Object LOCK = new Object(); private static final Object LOCK = new Object();
private volatile boolean closed = false;
/** /**
* Initialize disruptor with configurable ringbuffer size * Initialize disruptor with configurable ringbuffer size
@ -61,10 +70,10 @@ public final class NamedQueueRecorder {
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
this.executorService = Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory(
hostingThreadName + ".slowlog.append-pool"));
// disruptor initialization with BlockingWaitStrategy // disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), Executors. this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), executorService,
newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName
+ ".slowlog.append-pool")),
ProducerType.MULTI, new BlockingWaitStrategy()); ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.handleExceptionsWith(new DisruptorExceptionHandler()); this.disruptor.handleExceptionsWith(new DisruptorExceptionHandler());
@ -142,12 +151,14 @@ public final class NamedQueueRecorder {
* service * service
*/ */
public void addRecord(NamedQueuePayload namedQueuePayload) { public void addRecord(NamedQueuePayload namedQueuePayload) {
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); if (!closed) {
long seqId = ringBuffer.next(); RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
try { long seqId = ringBuffer.next();
ringBuffer.get(seqId).load(namedQueuePayload); try {
} finally { ringBuffer.get(seqId).load(namedQueuePayload);
ringBuffer.publish(seqId); } finally {
ringBuffer.publish(seqId);
}
} }
} }
@ -161,4 +172,25 @@ public final class NamedQueueRecorder {
} }
} }
@Override
public void close() throws IOException {
// Setting closed flag to true so that we don't add more events to RingBuffer.
this.closed = true;
LOG.info("Closing NamedQueueRecorder");
if (this.disruptor != null) {
long timeoutms = 5000;
try {
this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.warn("Timed out bringing down disruptor after " + timeoutms + " ms; forcing halt", e);
this.disruptor.halt();
this.disruptor.shutdown();
}
}
// With disruptor down, this is safe to let go.
if (this.executorService != null) {
// This will close the executor threads.
this.executorService.shutdownNow();
}
}
} }

View File

@ -1231,6 +1231,14 @@ public class HRegionServer extends HasThread implements
stopServiceThreads(); stopServiceThreads();
} }
try {
if (this.namedQueueRecorder != null) {
namedQueueRecorder.close();
}
} catch (IOException ioe) {
LOG.warn("Attempt to close NamedQueueRecorder failed", ioe);
}
if (this.rpcServices != null) { if (this.rpcServices != null) {
this.rpcServices.stop(); this.rpcServices.stop();
} }