diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5db3af3b5a6..85649b466a1 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -307,6 +307,8 @@ Bug Fixes * SOLR-10879: DELETEREPLICA and DELETENODE commands should prevent data loss when replicationFactor is 1. (ab) +* SOLR-10983: Fix DOWNNODE -> queue-work explosion (Scott Blum, Joshua Humphries) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java index 64120ed2421..cfd31445b63 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java @@ -228,11 +228,8 @@ public class DistributedQueue { } /** - * Inserts data into queue. Successfully calling this method does NOT guarantee - * that the element will be immediately available in the in-memory queue. In particular, - * calling this method on an empty queue will not necessarily cause {@link #poll()} to - * return the offered element. Use a blocking method if you must wait for the offered - * element to become visible. + * Inserts data into queue. If there are no other queue consumers, the offered element + * will be immediately visible when this method returns. */ public void offer(byte[] data) throws KeeperException, InterruptedException { Timer.Context time = stats.time(dir + "_offer"); diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 7e1f8c40e38..05fce30a68e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -196,7 +196,9 @@ public class Overseer implements Closeable { log.error("Exception in Overseer main queue loop", e); } try { + boolean[] itemWasMoved = new boolean[1]; while (head != null) { + itemWasMoved[0] = false; byte[] data = head; final ZkNodeProps message = ZkNodeProps.load(data); log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); @@ -204,7 +206,11 @@ public class Overseer implements Closeable { clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() { @Override public void onEnqueue() throws Exception { - workQueue.offer(data); + if (!itemWasMoved[0]) { + stateUpdateQueue.poll(); + itemWasMoved[0] = true; + workQueue.offer(data); + } } @Override @@ -214,9 +220,10 @@ public class Overseer implements Closeable { } }); - // it is safer to keep this poll here because an invalid message might never be queued - // and therefore we can't rely on the ZkWriteCallback to remove the item - stateUpdateQueue.poll(); + // If the ZkWriteCallback never fired, just dump the item, it might be an invalid message. + if (!itemWasMoved[0]) { + stateUpdateQueue.poll(); + } if (isClosed) break; // if an event comes in the next 100ms batch it together