From 2950159a2cee3641d18a470bb832a87e5ed886b9 Mon Sep 17 00:00:00 2001
From: Shalin Shekhar Mangar <shalin@apache.org>
Date: Thu, 18 Dec 2014 15:13:41 +0000
Subject: [PATCH] SOLR-6554: Fix work queue handling

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1646474 13f79535-47bb-0310-9956-ffa450edef68
---
 .../java/org/apache/solr/cloud/Overseer.java  | 31 +++++++++++++++----
 .../solr/cloud/overseer/ZkStateWriter.java    | 27 ++++++++++++++--
 .../cloud/overseer/ZkStateWriterTest.java     |  4 +--
 3 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 862e398916d..391475d14e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -167,7 +167,7 @@ public class Overseer implements Closeable {
                 else if (LeaderStatus.YES == isLeader) {
                   final ZkNodeProps message = ZkNodeProps.load(head);
                   log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
-                  clusterState = processQueueItem(message, clusterState, zkStateWriter);
+                  clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
                   workQueue.poll(); // poll-ing removes the element we got by peek-ing
                 }
                 else {
@@ -242,7 +242,9 @@ public class Overseer implements Closeable {
                 while (data != null)  {
                   final ZkNodeProps message = ZkNodeProps.load(data);
                   log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
-                  clusterState = processQueueItem(message, clusterState, zkStateWriter);
+                  // force flush to ZK after each message because there is no fallback if workQueue items
+                  // are removed from workQueue but fail to be written to ZK
+                  clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
                   workQueue.poll(); // poll-ing removes the element we got by peek-ing
                   data = workQueue.peek();
                 }
@@ -253,11 +255,25 @@ public class Overseer implements Closeable {
               }
 
               while (head != null) {
+                final byte[] data = head.getBytes();
                 final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
                 log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
-                clusterState = processQueueItem(message, clusterState, zkStateWriter);
-                workQueue.offer(head.getBytes());
+                // we can batch here because workQueue is our fallback in case a ZK write failed
+                clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
+                  @Override
+                  public void onEnqueue() throws Exception {
+                    workQueue.offer(data);
+                  }
 
+                  @Override
+                  public void onWrite() throws Exception {
+                    // remove everything from workQueue
+                    while (workQueue.poll() != null);
+                  }
+                });
+
+                // it is safer to keep this poll here because an invalid message might never be queued
+                // and therefore we can't rely on the ZkWriteCallback to remove the item
                 stateUpdateQueue.poll();
 
                 if (isClosed) break;
@@ -299,7 +315,7 @@ public class Overseer implements Closeable {
       }
     }
 
-    private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException {
+    private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
       final String operation = message.getStr(QUEUE_OPERATION);
       ZkWriteCommand zkWriteCommand = null;
       final TimerContext timerContext = stats.time(operation);
@@ -318,7 +334,10 @@ public class Overseer implements Closeable {
         timerContext.stop();
       }
       if (zkWriteCommand != null) {
-        clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
+        clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
+        if (!enableBatching)  {
+          clusterState = zkStateWriter.writePendingUpdates();
+        }
       }
       return clusterState;
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index edfd50676dd..b9c07240b03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -59,12 +59,19 @@ public class ZkStateWriter {
     this.stats = stats;
   }
 
-  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) throws KeeperException, InterruptedException {
+  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
     if (cmd == NO_OP) return prevState;
 
     if (maybeFlushBefore(cmd)) {
       // we must update the prev state to the new one
       prevState = clusterState = writePendingUpdates();
+      if (callback != null) {
+        callback.onWrite();
+      }
+    }
+
+    if (callback != null) {
+      callback.onEnqueue();
     }
 
     if (cmd.collection == null) {
@@ -81,7 +88,11 @@ public class ZkStateWriter {
     }
 
     if (maybeFlushAfter(cmd)) {
-      return writePendingUpdates();
+      ClusterState state = writePendingUpdates();
+      if (callback != null) {
+        callback.onWrite();
+      }
+      return state;
     }
 
     return clusterState;
@@ -203,5 +214,17 @@ public class ZkStateWriter {
   public ClusterState getClusterState() {
     return clusterState;
   }
+
+  public interface ZkWriteCallback {
+    /**
+     * Called by ZkStateWriter if a ZkWriteCommand is queued
+     */
+    public void onEnqueue() throws Exception;
+
+    /**
+     * Called by ZkStateWriter if state is flushed to ZK
+     */
+    public void onWrite() throws Exception;
+  }
 }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index e7e9716ac53..bb2bb36c64a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -64,7 +64,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
           new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
       assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
 
-      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1);
+      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
 
       ZkWriteCommand c2 = new ZkWriteCommand("c2",
           new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
@@ -91,7 +91,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       // create a collection in stateFormat = 1 i.e. inside the main cluster state
       ZkWriteCommand c3 = new ZkWriteCommand("c3",
           new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-      clusterState = writer.enqueueUpdate(clusterState, c3);
+      clusterState = writer.enqueueUpdate(clusterState, c3, null);
 
       // simulate three state changes in c3, all should be batched
       for (int i=0; i<3; i++) {