SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes operate on stale cluster state

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1660449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-02-17 17:10:43 +00:00
parent 4d29650682
commit 26fdf474dd
3 changed files with 86 additions and 121 deletions

View File

@ -135,6 +135,10 @@ Bug Fixes
* SOLR-7084: FreeTextSuggester: Better error message when doing a lookup
during dictionary build. Used to be nullpointer (janhoy)
* SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes
operate on stale cluster state due to overseer holding the state update lock for a
long time. (Mark Miller, shalin)
Optimizations
----------------------
* SOLR-7049: Move work done by the LIST Collections API call to the Collections

View File

@ -143,59 +143,7 @@ public class Overseer implements Closeable {
log.debug("am_i_leader unclear {}", isLeader);
isLeader = amILeader(); // not a no, not a yes, try ask again
}
if (!this.isClosed && LeaderStatus.YES == isLeader) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { // XXX this only protects
// against edits inside single
// node
try {
byte[] head = workQueue.peek();
if (head != null) {
reader.updateClusterState(true);
ClusterState clusterState = reader.getClusterState();
log.info("Replaying operations from work queue.");
ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
while (head != null) {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
break;
}
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
log.info("am_i_leader unclear {}", isLeader);
// re-peek below in case our 'head' value is out-of-date by now
}
head = workQueue.peek();
}
// force flush at the end of the loop
clusterState = zkStateWriter.writePendingUpdates();
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
}
log.error("Exception in Overseer work queue loop", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer work queue loop", e);
}
}
}
log.info("Starting to work on the main queue");
try {
ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
@ -210,6 +158,45 @@ public class Overseer implements Closeable {
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try ask again
}
if (refreshClusterState) {
try {
reader.updateClusterState(true);
clusterState = reader.getClusterState();
refreshClusterState = false;
// if there were any errors while processing
// the state queue, items would have been left in the
// work queue so let's process those first
byte[] data = workQueue.peek();
boolean hadWorkItems = data != null;
while (data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
// force flush at the end of the loop
if (hadWorkItems) {
clusterState = zkStateWriter.writePendingUpdates();
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
}
log.error("Exception in Overseer work queue loop", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer work queue loop", e);
}
}
DistributedQueue.QueueEvent head = null;
try {
head = stateUpdateQueue.peek(true);
@ -227,81 +214,53 @@ public class Overseer implements Closeable {
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
}
synchronized (reader.getUpdateLock()) {
try {
if (refreshClusterState) {
reader.updateClusterState(true);
clusterState = reader.getClusterState();
refreshClusterState = false;
// if there were any errors while processing
// the state queue, items would have been left in the
// work queue so let's process those first
byte[] data = workQueue.peek();
boolean hadWorkItems = data != null;
while (data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
try {
while (head != null) {
final byte[] data = head.getBytes();
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
workQueue.offer(data);
}
// force flush at the end of the loop
if (hadWorkItems) {
clusterState = zkStateWriter.writePendingUpdates();
@Override
public void onWrite() throws Exception {
// remove everything from workQueue
while (workQueue.poll() != null);
}
}
});
while (head != null) {
final byte[] data = head.getBytes();
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
workQueue.offer(data);
}
// it is safer to keep this poll here because an invalid message might never be queued
// and therefore we can't rely on the ZkWriteCallback to remove the item
stateUpdateQueue.poll();
@Override
public void onWrite() throws Exception {
// remove everything from workQueue
while (workQueue.poll() != null);
}
});
// it is safer to keep this poll here because an invalid message might never be queued
// and therefore we can't rely on the ZkWriteCallback to remove the item
stateUpdateQueue.poll();
if (isClosed) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
// we should force write all pending updates because the next iteration might sleep until there
// are more items in the main queue
clusterState = zkStateWriter.writePendingUpdates();
// clean work queue
while (workQueue.poll() != null);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
if (isClosed) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
}
// we should force write all pending updates because the next iteration might sleep until there
// are more items in the main queue
clusterState = zkStateWriter.writePendingUpdates();
// clean work queue
while (workQueue.poll() != null);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
}
}
} finally {
log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId));

View File

@ -580,6 +580,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
NamedList results = new NamedList();
try {
// force update the cluster state
zkStateReader.updateClusterState(true);
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
if (action == null) {
// back-compat because we used strings different than enum values before SOLR-6115