diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index adae407dafa..e28ba9e2968 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -135,6 +135,10 @@ Bug Fixes * SOLR-7084: FreeTextSuggester: Better error message when doing a lookup during dictionary build. Used to be nullpointer (janhoy) +* SOLR-6956: OverseerCollectionProcessor and replicas on the overseer node can sometimes + operate on stale cluster state due to overseer holding the state update lock for a + long time. (Mark Miller, shalin) + Optimizations ---------------------- * SOLR-7049: Move work done by the LIST Collections API call to the Collections diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 7918ed3f4e5..6f48d5ceca1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -143,59 +143,7 @@ public class Overseer implements Closeable { log.debug("am_i_leader unclear {}", isLeader); isLeader = amILeader(); // not a no, not a yes, try ask again } - if (!this.isClosed && LeaderStatus.YES == isLeader) { - // see if there's something left from the previous Overseer and re - // process all events that were not persisted into cloud state - synchronized (reader.getUpdateLock()) { // XXX this only protects - // against edits inside single - // node - try { - byte[] head = workQueue.peek(); - - if (head != null) { - reader.updateClusterState(true); - ClusterState clusterState = reader.getClusterState(); - log.info("Replaying operations from work queue."); - ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); - - while (head != null) { - isLeader = amILeader(); - if (LeaderStatus.NO == isLeader) { - break; - } - else if (LeaderStatus.YES == isLeader) { - final ZkNodeProps message = ZkNodeProps.load(head); - log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); - clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); - workQueue.poll(); // poll-ing removes the element we got by peek-ing - } - else { - log.info("am_i_leader unclear {}", isLeader); - // re-peek below in case our 'head' value is out-of-date by now - } - head = workQueue.peek(); - } - // force flush at the end of the loop - clusterState = zkStateWriter.writePendingUpdates(); - } - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); - return; - } - log.error("Exception in Overseer work queue loop", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - - } catch (Exception e) { - log.error("Exception in Overseer work queue loop", e); - } - } - - } - log.info("Starting to work on the main queue"); try { ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); @@ -210,6 +158,45 @@ public class Overseer implements Closeable { log.debug("am_i_leader unclear {}", isLeader); continue; // not a no, not a yes, try ask again } + + if (refreshClusterState) { + try { + reader.updateClusterState(true); + clusterState = reader.getClusterState(); + refreshClusterState = false; + + // if there were any errors while processing + // the state queue, items would have been left in the + // work queue so let's process those first + byte[] data = workQueue.peek(); + boolean hadWorkItems = data != null; + while (data != null) { + final ZkNodeProps message = ZkNodeProps.load(data); + log.info("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); + // force flush to ZK after each message because there is no fallback if workQueue items + // are removed from workQueue but fail to be written to ZK + clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); + workQueue.poll(); // poll-ing removes the element we got by peek-ing + data = workQueue.peek(); + } + // force flush at the end of the loop + if (hadWorkItems) { + clusterState = zkStateWriter.writePendingUpdates(); + } + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.SESSIONEXPIRED) { + log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); + return; + } + log.error("Exception in Overseer work queue loop", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + log.error("Exception in Overseer work queue loop", e); + } + } + DistributedQueue.QueueEvent head = null; try { head = stateUpdateQueue.peek(true); @@ -227,81 +214,53 @@ public class Overseer implements Closeable { } catch (Exception e) { log.error("Exception in Overseer main queue loop", e); } - synchronized (reader.getUpdateLock()) { - try { - if (refreshClusterState) { - reader.updateClusterState(true); - clusterState = reader.getClusterState(); - refreshClusterState = false; - - // if there were any errors while processing - // the state queue, items would have been left in the - // work queue so let's process those first - byte[] data = workQueue.peek(); - boolean hadWorkItems = data != null; - while (data != null) { - final ZkNodeProps message = ZkNodeProps.load(data); - log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); - // force flush to ZK after each message because there is no fallback if workQueue items - // are removed from workQueue but fail to be written to ZK - clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); - workQueue.poll(); // poll-ing removes the element we got by peek-ing - data = workQueue.peek(); + try { + while (head != null) { + final byte[] data = head.getBytes(); + final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); + log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); + // we can batch here because workQueue is our fallback in case a ZK write failed + clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() { + @Override + public void onEnqueue() throws Exception { + workQueue.offer(data); } - // force flush at the end of the loop - if (hadWorkItems) { - clusterState = zkStateWriter.writePendingUpdates(); + + @Override + public void onWrite() throws Exception { + // remove everything from workQueue + while (workQueue.poll() != null); } - } + }); - while (head != null) { - final byte[] data = head.getBytes(); - final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); - log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); - // we can batch here because workQueue is our fallback in case a ZK write failed - clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() { - @Override - public void onEnqueue() throws Exception { - workQueue.offer(data); - } + // it is safer to keep this poll here because an invalid message might never be queued + // and therefore we can't rely on the ZkWriteCallback to remove the item + stateUpdateQueue.poll(); - @Override - public void onWrite() throws Exception { - // remove everything from workQueue - while (workQueue.poll() != null); - } - }); - - // it is safer to keep this poll here because an invalid message might never be queued - // and therefore we can't rely on the ZkWriteCallback to remove the item - stateUpdateQueue.poll(); - - if (isClosed) break; - // if an event comes in the next 100ms batch it together - head = stateUpdateQueue.peek(100); - } - // we should force write all pending updates because the next iteration might sleep until there - // are more items in the main queue - clusterState = zkStateWriter.writePendingUpdates(); - // clean work queue - while (workQueue.poll() != null); - - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); - return; - } - log.error("Exception in Overseer main queue loop", e); - refreshClusterState = true; // it might have been a bad version error - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } catch (Exception e) { - log.error("Exception in Overseer main queue loop", e); - refreshClusterState = true; // it might have been a bad version error + if (isClosed) break; + // if an event comes in the next 100ms batch it together + head = stateUpdateQueue.peek(100); } - } + // we should force write all pending updates because the next iteration might sleep until there + // are more items in the main queue + clusterState = zkStateWriter.writePendingUpdates(); + // clean work queue + while (workQueue.poll() != null); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.SESSIONEXPIRED) { + log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); + return; + } + log.error("Exception in Overseer main queue loop", e); + refreshClusterState = true; // it might have been a bad version error + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + log.error("Exception in Overseer main queue loop", e); + refreshClusterState = true; // it might have been a bad version error + } } } finally { log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId)); diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 9bf918f1c78..b353d50adfb 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -580,6 +580,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { NamedList results = new NamedList(); try { + // force update the cluster state + zkStateReader.updateClusterState(true); CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation); if (action == null) { // back-compat because we used strings different than enum values before SOLR-6115