From 2339bd0218ed5f50983e02c22130bbfc6e5c0fa7 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Mon, 1 Dec 2014 14:37:04 +0000 Subject: [PATCH] SOLR-6554: Refactored Overseer to improve reuse and put batching logic inside ZkStateWriter. Added a new ZkStateWriterTest. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1642693 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/solr/cloud/Overseer.java | 209 ++++-------------- .../solr/cloud/overseer/ZkStateWriter.java | 68 +++++- .../solr/cloud/overseer/ZkWriteCommand.java | 13 +- .../cloud/overseer/ZkStateWriterTest.java | 119 ++++++++++ 4 files changed, 235 insertions(+), 174 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 17391a77d21..862e398916d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -17,32 +17,26 @@ package org.apache.solr.cloud; * the License. */ -import static java.util.Collections.singletonMap; import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE; import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES; -import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX; import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Locale; import java.util.Map; -import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.ImmutableSet; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.cloud.overseer.ClusterStateMutator; @@ -55,7 +49,6 @@ import org.apache.solr.cloud.overseer.ZkWriteCommand; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; @@ -75,7 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Cluster leader. Responsible node assignments, cluster state file? + * Cluster leader. Responsible for processing state updates, node assignments, creating/deleting + * collections, shards, replicas and setting various properties. */ public class Overseer implements Closeable { public static final String QUEUE_OPERATION = "operation"; @@ -98,8 +92,6 @@ public class Overseer implements Closeable { static enum LeaderStatus {DONT_KNOW, NO, YES} - private long lastUpdatedTime = 0; - private class ClusterStateUpdater implements Runnable, Closeable { private final ZkStateReader reader; @@ -122,10 +114,6 @@ public class Overseer implements Closeable { private Map clusterProps; private boolean isClosed = false; - private final Map updateNodes = new LinkedHashMap<>(); - private boolean isClusterStateModified = false; - - public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) { this.zkClient = reader.getZkClient(); this.zkStats = zkStats; @@ -178,35 +166,18 @@ public class Overseer implements Closeable { } else if (LeaderStatus.YES == isLeader) { final ZkNodeProps message = ZkNodeProps.load(head); - final String operation = message.getStr(QUEUE_OPERATION); - final TimerContext timerContext = stats.time(operation); - try { - ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength()); - clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); - stats.success(operation); - } catch (Exception e) { - // generally there is nothing we can do - in most cases, we have - // an issue that will fail again on retry or we cannot communicate with a - // ZooKeeper in which case another Overseer should take over - // TODO: if ordering for the message is not important, we could - // track retries and put it back on the end of the queue - log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); - stats.error(operation); - } finally { - timerContext.stop(); - } - if (zkStateWriter.hasPendingUpdates()) { - clusterState = zkStateWriter.writePendingUpdates(); - } + log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); + clusterState = processQueueItem(message, clusterState, zkStateWriter); workQueue.poll(); // poll-ing removes the element we got by peek-ing } else { log.info("am_i_leader unclear {}", isLeader); // re-peek below in case our 'head' value is out-of-date by now } - head = workQueue.peek(); } + // force flush at the end of the loop + clusterState = zkStateWriter.writePendingUpdates(); } } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED) { @@ -226,8 +197,6 @@ public class Overseer implements Closeable { } log.info("Starting to work on the main queue"); - int lastStateFormat = -1; // sentinel - String lastCollectionName = null; try { ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); ClusterState clusterState = null; @@ -269,90 +238,37 @@ public class Overseer implements Closeable { // the state queue, items would have been left in the // work queue so let's process those first byte[] data = workQueue.peek(); + boolean hadWorkItems = data != null; while (data != null) { final ZkNodeProps message = ZkNodeProps.load(data); - final String operation = message.getStr(QUEUE_OPERATION); - final TimerContext timerContext = stats.time(operation); - try { - ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength()); - clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); - stats.success(operation); - } catch (Exception e) { - // generally there is nothing we can do - in most cases, we have - // an issue that will fail again on retry or we cannot communicate with a - // ZooKeeper in which case another Overseer should take over - // TODO: if ordering for the message is not important, we could - // track retries and put it back on the end of the queue - log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); - stats.error(operation); - } finally { - timerContext.stop(); - } - if (zkStateWriter.hasPendingUpdates()) { - clusterState = zkStateWriter.writePendingUpdates(); - } + log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); + clusterState = processQueueItem(message, clusterState, zkStateWriter); workQueue.poll(); // poll-ing removes the element we got by peek-ing data = workQueue.peek(); } + // force flush at the end of the loop + if (hadWorkItems) { + clusterState = zkStateWriter.writePendingUpdates(); + } } while (head != null) { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); - final String operation = message.getStr(QUEUE_OPERATION); - - // we batch updates for the main cluster state together (stateFormat=1) - // but if we encounter a message for a collection with a stateFormat different than the last - // then we stop batching at that point - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (collection == null) collection = message.getStr("name"); - if (collection != null) { - DocCollection docCollection = clusterState.getCollectionOrNull(collection); - if (lastStateFormat != -1 && docCollection != null && docCollection.getStateFormat() != lastStateFormat) { - lastStateFormat = docCollection.getStateFormat(); - // we don't want to mix batching of different state formats together because that makes - // it harder to guarantee atomicity of ZK writes - break; - } - if (docCollection != null) { - lastStateFormat = docCollection.getStateFormat(); - } - } - - final TimerContext timerContext = stats.time(operation); - try { - ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, stateUpdateQueue.getStats().getQueueLength()); - clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); - stats.success(operation); - } catch (Exception e) { - // generally there is nothing we can do - in most cases, we have - // an issue that will fail again on retry or we cannot communicate with - // ZooKeeper in which case another Overseer should take over - // TODO: if ordering for the message is not important, we could - // track retries and put it back on the end of the queue - log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); - stats.error(operation); - } finally { - timerContext.stop(); - } + log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); + clusterState = processQueueItem(message, clusterState, zkStateWriter); workQueue.offer(head.getBytes()); stateUpdateQueue.poll(); - if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break; - if (!updateNodes.isEmpty() && !collection.equals(lastCollectionName)) { - lastCollectionName = collection; - break; - } - lastCollectionName = collection; + if (isClosed) break; // if an event comes in the next 100ms batch it together head = stateUpdateQueue.peek(100); } - if (zkStateWriter.hasPendingUpdates()) { - clusterState = zkStateWriter.writePendingUpdates(); - lastUpdatedTime = zkStateWriter.getLastUpdatedTime(); - } + // we should force write all pending updates because the next iteration might sleep until there + // are more items in the main queue + clusterState = zkStateWriter.writePendingUpdates(); // clean work queue - while (workQueue.poll() != null) ; + while (workQueue.poll() != null); } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED) { @@ -383,6 +299,30 @@ public class Overseer implements Closeable { } } + private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException { + final String operation = message.getStr(QUEUE_OPERATION); + ZkWriteCommand zkWriteCommand = null; + final TimerContext timerContext = stats.time(operation); + try { + zkWriteCommand = processMessage(clusterState, message, operation); + stats.success(operation); + } catch (Exception e) { + // generally there is nothing we can do - in most cases, we have + // an issue that will fail again on retry or we cannot communicate with a + // ZooKeeper in which case another Overseer should take over + // TODO: if ordering for the message is not important, we could + // track retries and put it back on the end of the queue + log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); + stats.error(operation); + } finally { + timerContext.stop(); + } + if (zkWriteCommand != null) { + clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); + } + return clusterState; + } + private void checkIfIamStillLeader() { if (zkController != null && zkController.getCoreContainer().isShutDown()) return;//shutting down no need to go further org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); @@ -423,8 +363,7 @@ public class Overseer implements Closeable { } private ZkWriteCommand processMessage(ClusterState clusterState, - final ZkNodeProps message, final String operation, int queueSize) { - log.info("processMessage: queueSize: {}, message = {}", queueSize, message); + final ZkNodeProps message, final String operation) { CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation); if (collectionAction != null) { switch (collectionAction) { @@ -445,7 +384,7 @@ public class Overseer implements Closeable { case DELETEREPLICAPROP: return new ReplicaMutator(getZkStateReader()).removeReplicaProperty(clusterState, message); case BALANCESHARDUNIQUE: - ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(this, clusterState, message); + ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message); if (dProp.balanceProperty()) { String collName = message.getStr(ZkStateReader.COLLECTION_PROP); return new ZkWriteCommand(collName, dProp.getDocCollection()); @@ -557,55 +496,6 @@ public class Overseer implements Closeable { return LeaderStatus.NO; } - - - - private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { - DocCollection newCollection = null; - DocCollection coll = state.getCollectionOrNull(collectionName) ; - Map slices; - - if (coll == null) { - // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself - // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. - slices = new LinkedHashMap<>(1); - slices.put(slice.getName(), slice); - Map props = new HashMap<>(1); - props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME)); - newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter()); - } else { - slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy - slices.put(slice.getName(), slice); - newCollection = coll.copyWithSlices(slices); - } - - // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections)); - - return newState(state, singletonMap(collectionName, newCollection)); - } - - private ClusterState newState(ClusterState state, Map colls) { - for (Entry e : colls.entrySet()) { - DocCollection c = e.getValue(); - if (c == null) { - isClusterStateModified = true; - state = state.copyWith(e.getKey(), null); - updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null); - continue; - } - - if (c.getStateFormat() > 1) { - updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), - new ClusterState(-1, Collections.emptySet(), singletonMap(c.getName(), c))); - } else { - isClusterStateModified = true; - } - state = state.copyWith(e.getKey(), c); - - } - return state; - } - @Override public void close() { this.isClosed = true; @@ -614,7 +504,6 @@ public class Overseer implements Closeable { } // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice. private class ExclusiveSliceProperty { - private ClusterStateUpdater updater; private ClusterState clusterState; private final boolean onlyActiveNodes; private final String property; @@ -636,8 +525,7 @@ public class Overseer implements Closeable { private int assigned = 0; - ExclusiveSliceProperty(ClusterStateUpdater updater, ClusterState clusterState, ZkNodeProps message) { - this.updater = updater; + ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) { this.clusterState = clusterState; String tmp = message.getStr(ZkStateReader.PROPERTY_PROP); if (StringUtils.startsWith(tmp, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) { @@ -898,7 +786,8 @@ public class Overseer implements Closeable { balanceUnassignedReplicas(); for (Slice newSlice : changedSlices.values()) { - clusterState = updater.updateSlice(clusterState, collectionName, newSlice); + DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice); + clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection); } return true; } diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java index 369c6158407..edfd50676dd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.solr.cloud.Overseer; import org.apache.solr.common.cloud.ClusterState; @@ -37,7 +38,7 @@ import static java.util.Collections.singletonMap; public class ZkStateWriter { private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class); - public static ZkWriteCommand NO_OP = new ZkWriteCommand(); + public static ZkWriteCommand NO_OP = ZkWriteCommand.noop(); protected final ZkStateReader reader; protected final Overseer.Stats stats; @@ -45,7 +46,11 @@ public class ZkStateWriter { protected Map updates = new HashMap<>(); protected ClusterState clusterState = null; protected boolean isClusterStateModified = false; - protected long lastUpdatedTime = -1; + protected long lastUpdatedTime = 0; + + // state information which helps us batch writes + protected int lastStateFormat = -1; // sentinel value + protected String lastCollectionName = null; public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) { assert zkStateReader != null; @@ -54,12 +59,17 @@ public class ZkStateWriter { this.stats = stats; } - public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) { + public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) throws KeeperException, InterruptedException { if (cmd == NO_OP) return prevState; + if (maybeFlushBefore(cmd)) { + // we must update the prev state to the new one + prevState = clusterState = writePendingUpdates(); + } + if (cmd.collection == null) { isClusterStateModified = true; - clusterState = prevState.copyWith(cmd.name, (DocCollection) null); + clusterState = prevState.copyWith(cmd.name, null); updates.put(cmd.name, null); } else { if (cmd.collection.getStateFormat() > 1) { @@ -69,15 +79,54 @@ public class ZkStateWriter { } clusterState = prevState.copyWith(cmd.name, cmd.collection); } + + if (maybeFlushAfter(cmd)) { + return writePendingUpdates(); + } + return clusterState; } + /** + * Logic to decide a flush before processing a ZkWriteCommand + * + * @param cmd the ZkWriteCommand instance + * @return true if a flush is required, false otherwise + */ + protected boolean maybeFlushBefore(ZkWriteCommand cmd) { + if (lastUpdatedTime == 0) { + // first update, make sure we go through + return false; + } + if (cmd.collection == null) { + return false; + } + if (cmd.collection.getStateFormat() != lastStateFormat) { + return true; + } + return cmd.collection.getStateFormat() > 1 && !cmd.name.equals(lastCollectionName); + } + + /** + * Logic to decide a flush after processing a ZkWriteCommand + * + * @param cmd the ZkWriteCommand instance + * @return true if a flush to ZK is required, false otherwise + */ + protected boolean maybeFlushAfter(ZkWriteCommand cmd) { + if (cmd.collection == null) + return false; + lastCollectionName = cmd.name; + lastStateFormat = cmd.collection.getStateFormat(); + return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); + } + public boolean hasPendingUpdates() { return !updates.isEmpty() || isClusterStateModified; } public ClusterState writePendingUpdates() throws KeeperException, InterruptedException { - if (!hasPendingUpdates()) throw new IllegalStateException("No queued updates to execute"); + if (!hasPendingUpdates()) return clusterState; TimerContext timerContext = stats.time("update_state"); boolean success = false; try { @@ -94,7 +143,7 @@ public class ZkStateWriter { byte[] data = ZkStateReader.toJSON(new ClusterState(-1, Collections.emptySet(), singletonMap(c.getName(), c))); if (reader.getZkClient().exists(path, true)) { assert c.getZNodeVersion() >= 0; - log.info("going to update_collection {}", path); + log.info("going to update_collection {} version: {}", path, c.getZNodeVersion()); Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true); DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path); clusterState = clusterState.copyWith(name, newCollection); @@ -140,10 +189,17 @@ public class ZkStateWriter { return clusterState; } + /** + * @return time returned by System.nanoTime at which the main cluster state was last written to ZK or 0 if + * never + */ public long getLastUpdatedTime() { return lastUpdatedTime; } + /** + * @return the most up-to-date cluster state until the last enqueueUpdate operation + */ public ClusterState getClusterState() { return clusterState; } diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java index 1ffc5cd3677..2ef2998bed9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java @@ -17,33 +17,30 @@ package org.apache.solr.cloud.overseer; * limitations under the License. */ -import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; -/** -* Created by shalin on 29/10/14. -*/ public class ZkWriteCommand { public final String name; public final DocCollection collection; -// public final ClusterState state; public final boolean noop; public ZkWriteCommand(String name, DocCollection collection) { this.name = name; this.collection = collection; -// this.state = state; this.noop = false; } /** * Returns a no-op */ - public ZkWriteCommand() { + protected ZkWriteCommand() { this.noop = true; this.name = null; this.collection = null; -// this.state = null; + } + + public static ZkWriteCommand noop() { + return new ZkWriteCommand(); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java new file mode 100644 index 00000000000..e7e9716ac53 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java @@ -0,0 +1,119 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; + +import org.apache.lucene.util.IOUtils; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.AbstractZkTestCase; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.OverseerTest; +import org.apache.solr.cloud.ZkTestServer; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; + +public class ZkStateWriterTest extends SolrTestCaseJ4 { + + public void testZkStateWriterBatching() throws Exception { + String zkDir = createTempDir("testZkStateWriterBatching").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient zkClient = null; + + try { + server.run(); + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + + zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); + zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true); + + ZkStateReader reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); + + assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null))); + assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null))); + + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); + + // create new collection with stateFormat = 2 + ZkWriteCommand c1 = new ZkWriteCommand("c1", + new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1")); + assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1)); + + ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1); + + ZkWriteCommand c2 = new ZkWriteCommand("c2", + new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2")); + assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2)); + + // simulate three state changes on same collection, all should be batched together before + assertFalse(writer.maybeFlushBefore(c1)); + assertFalse(writer.maybeFlushBefore(c1)); + assertFalse(writer.maybeFlushBefore(c1)); + // and after too + assertFalse(writer.maybeFlushAfter(c1)); + assertFalse(writer.maybeFlushAfter(c1)); + assertFalse(writer.maybeFlushAfter(c1)); + + // simulate three state changes on two different collections with stateFormat=2, none should be batched + assertFalse(writer.maybeFlushBefore(c1)); + // flushAfter has to be called as it updates the internal batching related info + assertFalse(writer.maybeFlushAfter(c1)); + assertTrue(writer.maybeFlushBefore(c2)); + assertFalse(writer.maybeFlushAfter(c2)); + assertTrue(writer.maybeFlushBefore(c1)); + assertFalse(writer.maybeFlushAfter(c1)); + + // create a collection in stateFormat = 1 i.e. inside the main cluster state + ZkWriteCommand c3 = new ZkWriteCommand("c3", + new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); + clusterState = writer.enqueueUpdate(clusterState, c3); + + // simulate three state changes in c3, all should be batched + for (int i=0; i<3; i++) { + assertFalse(writer.maybeFlushBefore(c3)); + assertFalse(writer.maybeFlushAfter(c3)); + } + + // simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2) + // none should be batched together + assertFalse(writer.maybeFlushBefore(c3)); + assertFalse(writer.maybeFlushAfter(c3)); + assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1)); + assertFalse(writer.maybeFlushAfter(c1)); + assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3)); + assertFalse(writer.maybeFlushAfter(c3)); + assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2)); + assertFalse(writer.maybeFlushAfter(c2)); + + } finally { + IOUtils.close(zkClient); + server.shutdown(); + } + } + +}