mirror of https://github.com/apache/lucene.git
SOLR-4291: Harden the Overseer work queue thread loop.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1432256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0ec7cb6d71
commit
83fe02894f
|
@ -503,6 +503,8 @@ Bug Fixes
|
|||
import works fine with SolrCloud clusters (Deniz Durmus, James Dyer,
|
||||
Erick Erickson, shalin)
|
||||
|
||||
* SOLR-4291: Harden the Overseer work queue thread loop. (Mark Miller)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -22,14 +22,12 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClosableThread;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
@ -37,7 +35,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
|
|||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -78,46 +75,48 @@ public class Overseer {
|
|||
@Override
|
||||
public void run() {
|
||||
|
||||
if(!this.isClosed && amILeader()) {
|
||||
if (!this.isClosed && amILeader()) {
|
||||
// see if there's something left from the previous Overseer and re
|
||||
// process all events that were not persisted into cloud state
|
||||
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
|
||||
try {
|
||||
byte[] head = workQueue.peek();
|
||||
synchronized (reader.getUpdateLock()) { // XXX this only protects
|
||||
// against edits inside single
|
||||
// node
|
||||
try {
|
||||
byte[] head = workQueue.peek();
|
||||
|
||||
if (head != null) {
|
||||
reader.updateClusterState(true);
|
||||
ClusterState clusterState = reader.getClusterState();
|
||||
log.info("Replaying operations from work queue.");
|
||||
|
||||
if (head != null) {
|
||||
reader.updateClusterState(true);
|
||||
ClusterState clusterState = reader.getClusterState();
|
||||
log.info("Replaying operations from work queue.");
|
||||
while (head != null && amILeader()) {
|
||||
final ZkNodeProps message = ZkNodeProps.load(head);
|
||||
final String operation = message.getStr(QUEUE_OPERATION);
|
||||
clusterState = processMessage(clusterState, message, operation);
|
||||
zkClient.setData(ZkStateReader.CLUSTER_STATE,
|
||||
ZkStateReader.toJSON(clusterState), true);
|
||||
|
||||
while (head != null && amILeader()) {
|
||||
final ZkNodeProps message = ZkNodeProps.load(head);
|
||||
final String operation = message
|
||||
.getStr(QUEUE_OPERATION);
|
||||
clusterState = processMessage(clusterState, message, operation);
|
||||
zkClient.setData(ZkStateReader.CLUSTER_STATE,
|
||||
ZkStateReader.toJSON(clusterState), true);
|
||||
|
||||
workQueue.poll();
|
||||
|
||||
head = workQueue.peek();
|
||||
}
|
||||
workQueue.poll();
|
||||
|
||||
head = workQueue.peek();
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|
||||
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
|
||||
log.warn("Solr cannot talk to ZK");
|
||||
return;
|
||||
}
|
||||
SolrException.log(log, "", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
|
||||
return;
|
||||
}
|
||||
log.error("Exception in Overseer work queue loop", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Exception in Overseer work queue loop", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.info("Starting to work on the main queue");
|
||||
while (!this.isClosed && amILeader()) {
|
||||
|
@ -146,17 +145,17 @@ public class Overseer {
|
|||
while (workQueue.poll() != null);
|
||||
|
||||
} catch (KeeperException e) {
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|
||||
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
|
||||
log.warn("Overseer cannot talk to ZK");
|
||||
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
|
||||
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
|
||||
return;
|
||||
}
|
||||
SolrException.log(log, "", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"", e);
|
||||
log.error("Exception in Overseer main queue loop", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Exception in Overseer main queue loop", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue