SOLR-6554: Refactored Overseer to improve reuse and put batching logic inside ZkStateWriter. Added a new ZkStateWriterTest.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1642693 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2014-12-01 14:37:04 +00:00
parent 193c1be0b4
commit 2339bd0218
4 changed files with 235 additions and 174 deletions

View File

@ -17,32 +17,26 @@ package org.apache.solr.cloud;
* the License.
*/
import static java.util.Collections.singletonMap;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
@ -55,7 +49,6 @@ import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
@ -75,7 +68,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cluster leader. Responsible node assignments, cluster state file?
* Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
* collections, shards, replicas and setting various properties.
*/
public class Overseer implements Closeable {
public static final String QUEUE_OPERATION = "operation";
@ -98,8 +92,6 @@ public class Overseer implements Closeable {
static enum LeaderStatus {DONT_KNOW, NO, YES}
private long lastUpdatedTime = 0;
private class ClusterStateUpdater implements Runnable, Closeable {
private final ZkStateReader reader;
@ -122,10 +114,6 @@ public class Overseer implements Closeable {
private Map clusterProps;
private boolean isClosed = false;
private final Map<String, Object> updateNodes = new LinkedHashMap<>();
private boolean isClusterStateModified = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
this.zkStats = zkStats;
@ -178,35 +166,18 @@ public class Overseer implements Closeable {
}
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION);
final TimerContext timerContext = stats.time(operation);
try {
ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength());
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with a
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
}
if (zkStateWriter.hasPendingUpdates()) {
clusterState = zkStateWriter.writePendingUpdates();
}
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
log.info("am_i_leader unclear {}", isLeader);
// re-peek below in case our 'head' value is out-of-date by now
}
head = workQueue.peek();
}
// force flush at the end of the loop
clusterState = zkStateWriter.writePendingUpdates();
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@ -226,8 +197,6 @@ public class Overseer implements Closeable {
}
log.info("Starting to work on the main queue");
int lastStateFormat = -1; // sentinel
String lastCollectionName = null;
try {
ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
ClusterState clusterState = null;
@ -269,90 +238,37 @@ public class Overseer implements Closeable {
// the state queue, items would have been left in the
// work queue so let's process those first
byte[] data = workQueue.peek();
boolean hadWorkItems = data != null;
while (data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
final String operation = message.getStr(QUEUE_OPERATION);
final TimerContext timerContext = stats.time(operation);
try {
ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength());
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with a
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
}
if (zkStateWriter.hasPendingUpdates()) {
clusterState = zkStateWriter.writePendingUpdates();
}
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
// force flush at the end of the loop
if (hadWorkItems) {
clusterState = zkStateWriter.writePendingUpdates();
}
}
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String operation = message.getStr(QUEUE_OPERATION);
// we batch updates for the main cluster state together (stateFormat=1)
// but if we encounter a message for a collection with a stateFormat different than the last
// then we stop batching at that point
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (collection == null) collection = message.getStr("name");
if (collection != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (lastStateFormat != -1 && docCollection != null && docCollection.getStateFormat() != lastStateFormat) {
lastStateFormat = docCollection.getStateFormat();
// we don't want to mix batching of different state formats together because that makes
// it harder to guarantee atomicity of ZK writes
break;
}
if (docCollection != null) {
lastStateFormat = docCollection.getStateFormat();
}
}
final TimerContext timerContext = stats.time(operation);
try {
ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, stateUpdateQueue.getStats().getQueueLength());
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
}
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.offer(head.getBytes());
stateUpdateQueue.poll();
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
if (!updateNodes.isEmpty() && !collection.equals(lastCollectionName)) {
lastCollectionName = collection;
break;
}
lastCollectionName = collection;
if (isClosed) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
if (zkStateWriter.hasPendingUpdates()) {
clusterState = zkStateWriter.writePendingUpdates();
lastUpdatedTime = zkStateWriter.getLastUpdatedTime();
}
// we should force write all pending updates because the next iteration might sleep until there
// are more items in the main queue
clusterState = zkStateWriter.writePendingUpdates();
// clean work queue
while (workQueue.poll() != null) ;
while (workQueue.poll() != null);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@ -383,6 +299,30 @@ public class Overseer implements Closeable {
}
}
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException {
final String operation = message.getStr(QUEUE_OPERATION);
ZkWriteCommand zkWriteCommand = null;
final TimerContext timerContext = stats.time(operation);
try {
zkWriteCommand = processMessage(clusterState, message, operation);
stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with a
// ZooKeeper in which case another Overseer should take over
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
}
if (zkWriteCommand != null) {
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
}
return clusterState;
}
private void checkIfIamStillLeader() {
if (zkController != null && zkController.getCoreContainer().isShutDown()) return;//shutting down no need to go further
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
@ -423,8 +363,7 @@ public class Overseer implements Closeable {
}
private ZkWriteCommand processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation, int queueSize) {
log.info("processMessage: queueSize: {}, message = {}", queueSize, message);
final ZkNodeProps message, final String operation) {
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
if (collectionAction != null) {
switch (collectionAction) {
@ -445,7 +384,7 @@ public class Overseer implements Closeable {
case DELETEREPLICAPROP:
return new ReplicaMutator(getZkStateReader()).removeReplicaProperty(clusterState, message);
case BALANCESHARDUNIQUE:
ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(this, clusterState, message);
ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
if (dProp.balanceProperty()) {
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
return new ZkWriteCommand(collName, dProp.getDocCollection());
@ -557,55 +496,6 @@ public class Overseer implements Closeable {
return LeaderStatus.NO;
}
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
DocCollection newCollection = null;
DocCollection coll = state.getCollectionOrNull(collectionName) ;
Map<String,Slice> slices;
if (coll == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new LinkedHashMap<>(1);
slices.put(slice.getName(), slice);
Map<String,Object> props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
} else {
slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
newCollection = coll.copyWithSlices(slices);
}
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
return newState(state, singletonMap(collectionName, newCollection));
}
private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
for (Entry<String, DocCollection> e : colls.entrySet()) {
DocCollection c = e.getValue();
if (c == null) {
isClusterStateModified = true;
state = state.copyWith(e.getKey(), null);
updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null);
continue;
}
if (c.getStateFormat() > 1) {
updateNodes.put(ZkStateReader.getCollectionPath(c.getName()),
new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
} else {
isClusterStateModified = true;
}
state = state.copyWith(e.getKey(), c);
}
return state;
}
@Override
public void close() {
this.isClosed = true;
@ -614,7 +504,6 @@ public class Overseer implements Closeable {
}
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
private class ExclusiveSliceProperty {
private ClusterStateUpdater updater;
private ClusterState clusterState;
private final boolean onlyActiveNodes;
private final String property;
@ -636,8 +525,7 @@ public class Overseer implements Closeable {
private int assigned = 0;
ExclusiveSliceProperty(ClusterStateUpdater updater, ClusterState clusterState, ZkNodeProps message) {
this.updater = updater;
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
this.clusterState = clusterState;
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
if (StringUtils.startsWith(tmp, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
@ -898,7 +786,8 @@ public class Overseer implements Closeable {
balanceUnassignedReplicas();
for (Slice newSlice : changedSlices.values()) {
clusterState = updater.updateSlice(clusterState, collectionName, newSlice);
DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
}
return true;
}

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.ClusterState;
@ -37,7 +38,7 @@ import static java.util.Collections.singletonMap;
public class ZkStateWriter {
private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
public static ZkWriteCommand NO_OP = new ZkWriteCommand();
public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
protected final ZkStateReader reader;
protected final Overseer.Stats stats;
@ -45,7 +46,11 @@ public class ZkStateWriter {
protected Map<String, DocCollection> updates = new HashMap<>();
protected ClusterState clusterState = null;
protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = -1;
protected long lastUpdatedTime = 0;
// state information which helps us batch writes
protected int lastStateFormat = -1; // sentinel value
protected String lastCollectionName = null;
public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
assert zkStateReader != null;
@ -54,12 +59,17 @@ public class ZkStateWriter {
this.stats = stats;
}
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) {
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) throws KeeperException, InterruptedException {
if (cmd == NO_OP) return prevState;
if (maybeFlushBefore(cmd)) {
// we must update the prev state to the new one
prevState = clusterState = writePendingUpdates();
}
if (cmd.collection == null) {
isClusterStateModified = true;
clusterState = prevState.copyWith(cmd.name, (DocCollection) null);
clusterState = prevState.copyWith(cmd.name, null);
updates.put(cmd.name, null);
} else {
if (cmd.collection.getStateFormat() > 1) {
@ -69,15 +79,54 @@ public class ZkStateWriter {
}
clusterState = prevState.copyWith(cmd.name, cmd.collection);
}
if (maybeFlushAfter(cmd)) {
return writePendingUpdates();
}
return clusterState;
}
/**
* Logic to decide a flush before processing a ZkWriteCommand
*
* @param cmd the ZkWriteCommand instance
* @return true if a flush is required, false otherwise
*/
protected boolean maybeFlushBefore(ZkWriteCommand cmd) {
if (lastUpdatedTime == 0) {
// first update, make sure we go through
return false;
}
if (cmd.collection == null) {
return false;
}
if (cmd.collection.getStateFormat() != lastStateFormat) {
return true;
}
return cmd.collection.getStateFormat() > 1 && !cmd.name.equals(lastCollectionName);
}
/**
* Logic to decide a flush after processing a ZkWriteCommand
*
* @param cmd the ZkWriteCommand instance
* @return true if a flush to ZK is required, false otherwise
*/
protected boolean maybeFlushAfter(ZkWriteCommand cmd) {
if (cmd.collection == null)
return false;
lastCollectionName = cmd.name;
lastStateFormat = cmd.collection.getStateFormat();
return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified;
}
public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
if (!hasPendingUpdates()) throw new IllegalStateException("No queued updates to execute");
if (!hasPendingUpdates()) return clusterState;
TimerContext timerContext = stats.time("update_state");
boolean success = false;
try {
@ -94,7 +143,7 @@ public class ZkStateWriter {
byte[] data = ZkStateReader.toJSON(new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
if (reader.getZkClient().exists(path, true)) {
assert c.getZNodeVersion() >= 0;
log.info("going to update_collection {}", path);
log.info("going to update_collection {} version: {}", path, c.getZNodeVersion());
Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path);
clusterState = clusterState.copyWith(name, newCollection);
@ -140,10 +189,17 @@ public class ZkStateWriter {
return clusterState;
}
/**
* @return time returned by System.nanoTime at which the main cluster state was last written to ZK or 0 if
* never
*/
public long getLastUpdatedTime() {
return lastUpdatedTime;
}
/**
* @return the most up-to-date cluster state until the last enqueueUpdate operation
*/
public ClusterState getClusterState() {
return clusterState;
}

View File

@ -17,33 +17,30 @@ package org.apache.solr.cloud.overseer;
* limitations under the License.
*/
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
/**
* Created by shalin on 29/10/14.
*/
public class ZkWriteCommand {
public final String name;
public final DocCollection collection;
// public final ClusterState state;
public final boolean noop;
public ZkWriteCommand(String name, DocCollection collection) {
this.name = name;
this.collection = collection;
// this.state = state;
this.noop = false;
}
/**
* Returns a no-op
*/
public ZkWriteCommand() {
protected ZkWriteCommand() {
this.noop = true;
this.name = null;
this.collection = null;
// this.state = null;
}
public static ZkWriteCommand noop() {
return new ZkWriteCommand();
}
}

View File

@ -0,0 +1,119 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashMap;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerTest;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
public class ZkStateWriterTest extends SolrTestCaseJ4 {
public void testZkStateWriterBatching() throws Exception {
String zkDir = createTempDir("testZkStateWriterBatching").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
// create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1);
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
// simulate three state changes on same collection, all should be batched together before
assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1));
// and after too
assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1));
// simulate three state changes on two different collections with stateFormat=2, none should be batched
assertFalse(writer.maybeFlushBefore(c1));
// flushAfter has to be called as it updates the internal batching related info
assertFalse(writer.maybeFlushAfter(c1));
assertTrue(writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
assertTrue(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
// create a collection in stateFormat = 1 i.e. inside the main cluster state
ZkWriteCommand c3 = new ZkWriteCommand("c3",
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
clusterState = writer.enqueueUpdate(clusterState, c3);
// simulate three state changes in c3, all should be batched
for (int i=0; i<3; i++) {
assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
}
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
// none should be batched together
assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
} finally {
IOUtils.close(zkClient);
server.shutdown();
}
}
}