SOLR-12120: Fix bug in draining queue before shutdown

This commit is contained in:
Jan Høydahl 2019-05-02 23:26:09 +02:00
parent 8908205b42
commit cdd130ccb6
2 changed files with 8 additions and 2 deletions

View File

@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
@ -68,6 +69,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
private static final int DEFAULT_NUM_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
BlockingQueue<AuditEvent> queue;
AtomicInteger auditsInFlight = new AtomicInteger(0);
boolean async;
boolean blockAsync;
int blockingQueueSize;
@ -197,6 +199,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
while (!closed && !Thread.currentThread().isInterrupted()) {
try {
AuditEvent event = queue.poll(1000, TimeUnit.MILLISECONDS);
auditsInFlight.incrementAndGet();
if (event == null) continue;
if (event.getDate() != null) {
queuedTime.update(new Date().getTime() - event.getDate().getTime(), TimeUnit.MILLISECONDS);
@ -211,6 +214,8 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
} catch (Exception ex) {
log.error("Exception when attempting to audit log asynchronously", ex);
numErrors.mark();
} finally {
auditsInFlight.decrementAndGet();
}
}
}
@ -331,9 +336,9 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
protected void waitForQueueToDrain(int timeoutSeconds) {
if (async && executorService != null) {
int timeSlept = 0;
while (!queue.isEmpty() && timeSlept < timeoutSeconds) {
while ((!queue.isEmpty() || auditsInFlight.get() > 0) && timeSlept < timeoutSeconds) {
try {
log.info("Async auditlogger queue still has {} elements, sleeping to let it drain...", queue.size());
log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.get());
Thread.sleep(1000);
timeSlept ++;
} catch (InterruptedException ignored) {}

View File

@ -48,6 +48,7 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
log.warn("audit() interrupted while waiting to send callback, should not happen");
}
}
out.write(formatter.formatEvent(event) + "\n");