From cdd130ccb69e4483ca4fa488c3e9f0b88631cfcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 2 May 2019 23:26:09 +0200 Subject: [PATCH] SOLR-12120: Fix bug in draining queue before shutdown --- .../java/org/apache/solr/security/AuditLoggerPlugin.java | 9 +++++++-- .../apache/solr/security/CallbackAuditLoggerPlugin.java | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java index 80fe11e2cd7..544f822cc86 100644 --- a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java +++ b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java @@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -68,6 +69,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo private static final int DEFAULT_NUM_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); BlockingQueue queue; + AtomicInteger auditsInFlight = new AtomicInteger(0); boolean async; boolean blockAsync; int blockingQueueSize; @@ -197,6 +199,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo while (!closed && !Thread.currentThread().isInterrupted()) { try { AuditEvent event = queue.poll(1000, TimeUnit.MILLISECONDS); + auditsInFlight.incrementAndGet(); if (event == null) continue; if (event.getDate() != null) { queuedTime.update(new Date().getTime() - event.getDate().getTime(), TimeUnit.MILLISECONDS); @@ -211,6 +214,8 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo } catch (Exception ex) { log.error("Exception when attempting to audit log asynchronously", ex); numErrors.mark(); + } finally { + auditsInFlight.decrementAndGet(); } } } @@ -331,9 +336,9 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo protected void waitForQueueToDrain(int timeoutSeconds) { if (async && executorService != null) { int timeSlept = 0; - while (!queue.isEmpty() && timeSlept < timeoutSeconds) { + while ((!queue.isEmpty() || auditsInFlight.get() > 0) && timeSlept < timeoutSeconds) { try { - log.info("Async auditlogger queue still has {} elements, sleeping to let it drain...", queue.size()); + log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.get()); Thread.sleep(1000); timeSlept ++; } catch (InterruptedException ignored) {} diff --git a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java index 22cfbe2d9e0..756817c93d4 100644 --- a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java +++ b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java @@ -48,6 +48,7 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin { try { Thread.sleep(delay); } catch (InterruptedException e) { + log.warn("audit() interrupted while waiting to send callback, should not happen"); } } out.write(formatter.formatEvent(event) + "\n");