From 9543e85460b6d1264857c42b568d4a7f59c06007 Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Sat, 14 Oct 2017 09:33:17 +0700 Subject: [PATCH] SOLR-11443: Remove the usage of workqueue for Overseer --- .../java/org/apache/solr/cloud/Overseer.java | 83 +++++++++---------- .../apache/solr/cloud/ZkDistributedQueue.java | 37 +++++++++ .../solr/cloud/overseer/ZkStateWriter.java | 11 +-- ...rseerCollectionConfigSetProcessorTest.java | 2 +- .../cloud/overseer/ZkStateWriterTest.java | 36 +------- .../solr/common/cloud/DistributedQueue.java | 1 + 6 files changed, 83 insertions(+), 87 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 74a236d4077..fbf901c80b0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +48,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.ObjectReleaseTracker; +import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CloudConfig; import org.apache.solr.handler.admin.CollectionsHandler; @@ -84,6 +87,7 @@ public class Overseer implements Closeable { private final String myId; //queue where everybody can throw tasks private final ZkDistributedQueue stateUpdateQueue; + //TODO remove in 9.0, we do not push message into this queue anymore //Internal queue where overseer stores events that have not yet been published into cloudstate //If Overseer dies while extracting the main queue a new overseer will start from this queue private final ZkDistributedQueue workQueue; @@ -132,6 +136,10 @@ public class Overseer implements Closeable { ZkStateWriter zkStateWriter = null; ClusterState clusterState = null; boolean refreshClusterState = true; // let's refresh in the first iteration + // we write updates in batch, but if an exception is thrown when writing new clusterstate, + // we do not sure which message is bad message, therefore we will re-process node one by one + int fallbackQueueSize = Integer.MAX_VALUE; + ZkDistributedQueue fallbackQueue = workQueue; while (!this.isClosed) { isLeader = amILeader(); if (LeaderStatus.NO == isLeader) { @@ -153,11 +161,10 @@ public class Overseer implements Closeable { // if there were any errors while processing // the state queue, items would have been left in the // work queue so let's process those first - byte[] data = workQueue.peek(); - boolean hadWorkItems = data != null; - while (data != null) { + byte[] data = fallbackQueue.peek(); + while (fallbackQueueSize > 0 && data != null) { final ZkNodeProps message = ZkNodeProps.load(data); - log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); + log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getStats().getQueueLength(), message); // force flush to ZK after each message because there is no fallback if workQueue items // are removed from workQueue but fail to be written to ZK try { @@ -165,17 +172,19 @@ public class Overseer implements Closeable { } catch (Exception e) { if (isBadMessage(e)) { log.warn("Exception when process message = {}, consider as bad message and poll out from the queue", message); - workQueue.poll(); + fallbackQueue.poll(); } throw e; } - workQueue.poll(); // poll-ing removes the element we got by peek-ing - data = workQueue.peek(); - } - // force flush at the end of the loop - if (hadWorkItems) { - clusterState = zkStateWriter.writePendingUpdates(); + fallbackQueue.poll(); // poll-ing removes the element we got by peek-ing + data = fallbackQueue.peek(); + fallbackQueueSize--; } + // force flush at the end of the loop, if there are no pending updates, this is a no op call + clusterState = zkStateWriter.writePendingUpdates(); + // the workQueue is empty now, use stateUpdateQueue as fallback queue + fallbackQueue = stateUpdateQueue; + fallbackQueueSize = 0; } catch (KeeperException.SessionExpiredException e) { log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); return; @@ -189,9 +198,10 @@ public class Overseer implements Closeable { } } - byte[] head = null; + LinkedList> queue = null; try { - head = stateUpdateQueue.peek(true); + // We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate + queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, Long.MAX_VALUE, (x) -> true)); } catch (KeeperException.SessionExpiredException e) { log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); return; @@ -202,45 +212,32 @@ public class Overseer implements Closeable { log.error("Exception in Overseer main queue loop", e); } try { - boolean[] itemWasMoved = new boolean[1]; - while (head != null) { - itemWasMoved[0] = false; - byte[] data = head; - final ZkNodeProps message = ZkNodeProps.load(data); - log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); - // we can batch here because workQueue is our fallback in case a ZK write failed - clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() { - @Override - public void onEnqueue() throws Exception { - if (!itemWasMoved[0]) { - workQueue.offer(data); - stateUpdateQueue.poll(); - itemWasMoved[0] = true; - } - } + Set processedNodes = new HashSet<>(); + while (queue != null && !queue.isEmpty()) { + for (Pair head : queue) { + byte[] data = head.second(); + final ZkNodeProps message = ZkNodeProps.load(data); + log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); - @Override - public void onWrite() throws Exception { - // remove everything from workQueue - while (workQueue.poll() != null); - } - }); - - // If the ZkWriteCallback never fired, just dump the item, it might be an invalid message. - if (!itemWasMoved[0]) { - stateUpdateQueue.poll(); + processedNodes.add(head.first()); + fallbackQueueSize = processedNodes.size(); + // The callback always be called on this thread + clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> { + stateUpdateQueue.remove(processedNodes); + processedNodes.clear(); + }); } - if (isClosed) break; // if an event comes in the next 100ms batch it together - head = stateUpdateQueue.peek(100); + queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 100, node -> !processedNodes.contains(node))); } + fallbackQueueSize = processedNodes.size(); // we should force write all pending updates because the next iteration might sleep until there // are more items in the main queue clusterState = zkStateWriter.writePendingUpdates(); // clean work queue - while (workQueue.poll() != null); - + stateUpdateQueue.remove(processedNodes); + processedNodes.clear(); } catch (KeeperException.SessionExpiredException e) { log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); return; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java index 6a1b8a055c3..c007dd48244 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java @@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.util.Pair; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; @@ -224,6 +225,42 @@ public class ZkDistributedQueue implements DistributedQueue { } } + public void remove(Collection paths) throws KeeperException, InterruptedException { + if (paths.isEmpty()) return; + List ops = new ArrayList<>(); + for (String path : paths) { + ops.add(Op.delete(dir + "/" + path, -1)); + } + for (int from = 0; from < ops.size(); from += 1000) { + int to = Math.min(from + 1000, ops.size()); + if (from < to) { + try { + zookeeper.multi(ops.subList(from, to), true); + } catch (KeeperException.NoNodeException e) { + // don't know which nodes are not exist, so try to delete one by one node + for (int j = from; j < to; j++) { + try { + zookeeper.delete(ops.get(j).getPath(), -1, true); + } catch (KeeperException.NoNodeException e2) { + LOG.debug("Can not remove node which is not exist : " + ops.get(j).getPath()); + } + } + } + } + } + + int cacheSizeBefore = knownChildren.size(); + knownChildren.removeAll(paths); + if (cacheSizeBefore - paths.size() == knownChildren.size() && knownChildren.size() != 0) { + stats.setQueueLength(knownChildren.size()); + } else { + // There are elements get deleted but not present in the cache, + // the cache seems not valid anymore + knownChildren.clear(); + isDirty = true; + } + } + /** * Removes the head of the queue and returns it, blocks until it succeeds. * diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java index 9c810e803e8..61ac8ae9b2e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java @@ -112,10 +112,6 @@ public class ZkStateWriter { if (cmds.isEmpty()) return prevState; if (isNoOps(cmds)) return prevState; - if (callback != null) { - callback.onEnqueue(); - } - for (ZkWriteCommand cmd : cmds) { if (cmd == NO_OP) continue; if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) { @@ -253,15 +249,10 @@ public class ZkStateWriter { } public interface ZkWriteCallback { - /** - * Called by ZkStateWriter if a ZkWriteCommand is queued - */ - public void onEnqueue() throws Exception; - /** * Called by ZkStateWriter if state is flushed to ZK */ - public void onWrite() throws Exception; + void onWrite() throws Exception; } } diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java index 6694fc17154..9079e6498e6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java @@ -191,7 +191,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { doAnswer(invocation -> { queue.remove(invocation.getArgument(0)); return null; - }).when(workQueueMock).remove(any()); + }).when(workQueueMock).remove(any(QueueEvent.class)); when(workQueueMock.poll()).thenAnswer(invocation -> { queue.poll(); diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java index 436d72e7b5e..bd310504d69 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java @@ -45,17 +45,7 @@ import org.slf4j.LoggerFactory; public class ZkStateWriterTest extends SolrTestCaseJ4 { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = new ZkStateWriter.ZkWriteCallback() { - @Override - public void onEnqueue() throws Exception { - - } - - @Override - public void onWrite() throws Exception { - fail("Got unexpected flush"); - } - }; + private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = () -> fail("Got unexpected flush"); @BeforeClass public static void setup() { @@ -106,31 +96,11 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 { Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100); AtomicBoolean didWrite = new AtomicBoolean(false); - clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() { - @Override - public void onEnqueue() throws Exception { - - } - - @Override - public void onWrite() throws Exception { - didWrite.set(true); - } - }); + clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), () -> didWrite.set(true)); assertTrue("Exceed the update delay, should be flushed", didWrite.get()); for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) { - clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() { - @Override - public void onEnqueue() throws Exception { - - } - - @Override - public void onWrite() throws Exception { - didWrite.set(true); - } - }); + clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), () -> didWrite.set(true)); } assertTrue("Exceed the update batch size, should be flushed", didWrite.get()); } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DistributedQueue.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DistributedQueue.java index d23acdb6372..6ae2ce138b8 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DistributedQueue.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DistributedQueue.java @@ -38,4 +38,5 @@ public interface DistributedQueue { Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws Exception; + void remove(Collection paths) throws Exception; }