SOLR-6554: Fix work queue handling

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1646474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2014-12-18 15:13:41 +00:00
parent 9ac8383fb7
commit 2950159a2c
3 changed files with 52 additions and 10 deletions

View File

@ -167,7 +167,7 @@ public class Overseer implements Closeable {
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
clusterState = processQueueItem(message, clusterState, zkStateWriter);
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
@ -242,7 +242,9 @@ public class Overseer implements Closeable {
while (data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
clusterState = processQueueItem(message, clusterState, zkStateWriter);
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
@ -253,11 +255,25 @@ public class Overseer implements Closeable {
}
while (head != null) {
final byte[] data = head.getBytes();
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
clusterState = processQueueItem(message, clusterState, zkStateWriter);
workQueue.offer(head.getBytes());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
workQueue.offer(data);
}
@Override
public void onWrite() throws Exception {
// remove everything from workQueue
while (workQueue.poll() != null);
}
});
// it is safer to keep this poll here because an invalid message might never be queued
// and therefore we can't rely on the ZkWriteCallback to remove the item
stateUpdateQueue.poll();
if (isClosed) break;
@ -299,7 +315,7 @@ public class Overseer implements Closeable {
}
}
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException {
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
final String operation = message.getStr(QUEUE_OPERATION);
ZkWriteCommand zkWriteCommand = null;
final TimerContext timerContext = stats.time(operation);
@ -318,7 +334,10 @@ public class Overseer implements Closeable {
timerContext.stop();
}
if (zkWriteCommand != null) {
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
if (!enableBatching) {
clusterState = zkStateWriter.writePendingUpdates();
}
}
return clusterState;
}

View File

@ -59,12 +59,19 @@ public class ZkStateWriter {
this.stats = stats;
}
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) throws KeeperException, InterruptedException {
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
if (cmd == NO_OP) return prevState;
if (maybeFlushBefore(cmd)) {
// we must update the prev state to the new one
prevState = clusterState = writePendingUpdates();
if (callback != null) {
callback.onWrite();
}
}
if (callback != null) {
callback.onEnqueue();
}
if (cmd.collection == null) {
@ -81,7 +88,11 @@ public class ZkStateWriter {
}
if (maybeFlushAfter(cmd)) {
return writePendingUpdates();
ClusterState state = writePendingUpdates();
if (callback != null) {
callback.onWrite();
}
return state;
}
return clusterState;
@ -203,5 +214,17 @@ public class ZkStateWriter {
public ClusterState getClusterState() {
return clusterState;
}
public interface ZkWriteCallback {
/**
* Called by ZkStateWriter if a ZkWriteCommand is queued
*/
public void onEnqueue() throws Exception;
/**
* Called by ZkStateWriter if state is flushed to ZK
*/
public void onWrite() throws Exception;
}
}

View File

@ -64,7 +64,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1);
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
@ -91,7 +91,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
// create a collection in stateFormat = 1 i.e. inside the main cluster state
ZkWriteCommand c3 = new ZkWriteCommand("c3",
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
clusterState = writer.enqueueUpdate(clusterState, c3);
clusterState = writer.enqueueUpdate(clusterState, c3, null);
// simulate three state changes in c3, all should be batched
for (int i=0; i<3; i++) {