From 210301b424a4d68d1bb8f352b13f528b0cf9b5a7 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Sun, 29 Jan 2012 02:22:55 +0000 Subject: [PATCH] SOLR-3065: Let overseer process cluster state changes asynchronously git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1237194 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/ElectionContext.java | 21 +- .../java/org/apache/solr/cloud/Overseer.java | 400 +++++++++++------- 2 files changed, 261 insertions(+), 160 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 387b1e8d5ce..a773a5b88e5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -209,14 +209,31 @@ final class OverseerElectionContext extends ElectionContext { private final ZkStateReader stateReader; public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) { - super(zkNodeName, "/overseer_elect", null, null); + super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null); this.zkClient = zkClient; this.stateReader = stateReader; } @Override void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException { - new Overseer(zkClient, stateReader); + + final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1); + ZkNodeProps myProps = new ZkNodeProps("id", id); + + try { + zkClient.makePath(leaderPath, + ZkStateReader.toJSON(myProps), + CreateMode.EPHEMERAL, true); + } catch (NodeExistsException e) { + // if a previous leader ephemeral still exists for some reason, try and + // remove it + zkClient.delete(leaderPath, -1, true); + zkClient.makePath(leaderPath, + ZkStateReader.toJSON(myProps), + CreateMode.EPHEMERAL, true); + } + + new Overseer(zkClient, stateReader, id); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 357e04b4619..4a252e78d9b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -22,9 +22,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.LinkedBlockingQueue; import java.util.Set; import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener; @@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory; * Cluster leader. Responsible node assignments, cluster state file? */ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { + + private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates + + static enum Op { + LeaderChange, StateChange; + } + + private final class CloudStateUpdateRequest { + + final Op operation; + final Object[] args; + + CloudStateUpdateRequest(final Op operation, final Object... args) { + this.operation = operation; + this.args = args; + } + } public static final String ASSIGNMENTS_NODE = "/node_assignments"; public static final String STATES_NODE = "/node_states"; private static Logger log = LoggerFactory.getLogger(Overseer.class); private final SolrZkClient zkClient; - private final ZkStateReader reader; + + // pooled updates + private final LinkedBlockingQueue fifo = new LinkedBlockingQueue(); // node stateWatches private HashMap nodeStateWatches = new HashMap(); @@ -66,12 +87,222 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { private HashMap> shardLeaderWatches = new HashMap>(); private ZkCmdExecutor zkCmdExecutor; - public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException { - log.info("Constructing new Overseer"); + private static class CloudStateUpdater implements Runnable { + + private final LinkedBlockingQueue fifo; + private final ZkStateReader reader; + private final SolrZkClient zkClient; + private final String myId; + + public CloudStateUpdater(final LinkedBlockingQueue fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) { + this.fifo = fifo; + this.myId = myId; + this.reader = reader; + this.zkClient = zkClient; + } + @Override + public void run() { + while (amILeader()) { + + + LinkedList requests = new LinkedList(); + while (!fifo.isEmpty()) { + // collect all queued requests + CloudStateUpdateRequest req; + req = fifo.poll(); + if (req == null) { + break; + } + requests.add(req); + } + + if (requests.size() > 0) { + // process updates + synchronized (reader.getUpdateLock()) { + try { + reader.updateCloudState(true); + CloudState cloudState = reader.getCloudState(); + for (CloudStateUpdateRequest request : requests) { + + switch (request.operation) { + case LeaderChange: + cloudState = setShardLeader(cloudState, + (String) request.args[0], (String) request.args[1], + (String) request.args[2]); + + break; + case StateChange: + cloudState = updateState(cloudState, + (String) request.args[0], (CoreState) request.args[1]); + break; + + } + } + + log.info("Announcing new cluster state"); + zkClient.setData(ZkStateReader.CLUSTER_STATE, + ZkStateReader.toJSON(cloudState), true); + + } catch (KeeperException e) { + // XXX stop processing, exit + return; + } catch (InterruptedException e) { + // XXX stop processing, exit + return; + } + } + } + + try { + Thread.sleep(STATE_UPDATE_DELAY); + } catch (InterruptedException e) { + // + } + } + } + + private boolean amILeader() { + try { + ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false)); + if(myId.equals(props.get("id"))) { + return true; + } + } catch (KeeperException e) { + // assume we're dead + } catch (InterruptedException e) { + // assume we're dead + } + log.info("According to ZK I (id=" + myId + ") am no longer a leader."); + return false; + } + /** + * Try to assign core to the cluster + * @throws KeeperException + * @throws InterruptedException + */ + private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException { + String collection = coreState.getCollectionName(); + String zkCoreNodeName = coreState.getCoreNodeName(); + + String shardId; + if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) { + shardId = AssignShard.assignShard(collection, state); + } else { + shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP); + } + + Map props = new HashMap(); + for (Entry entry : coreState.getProperties().entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + ZkNodeProps zkProps = new ZkNodeProps(props); + Slice slice = state.getSlice(collection, shardId); + Map shardProps; + if (slice == null) { + shardProps = new HashMap(); + } else { + shardProps = state.getSlice(collection, shardId).getShardsCopy(); + } + shardProps.put(zkCoreNodeName, zkProps); + + slice = new Slice(shardId, shardProps); + CloudState newCloudState = updateSlice(state, collection, slice); + return newCloudState; + } + + private CloudState updateSlice(CloudState state, String collection, Slice slice) { + + final Map> newStates = new LinkedHashMap>(); + newStates.putAll(state.getCollectionStates()); + + if (!newStates.containsKey(collection)) { + newStates.put(collection, new LinkedHashMap()); + } + + final Map slices = newStates.get(collection); + if (!slices.containsKey(slice.getName())) { + slices.put(slice.getName(), slice); + } else { + final Map shards = new LinkedHashMap(); + final Slice existingSlice = slices.get(slice.getName()); + shards.putAll(existingSlice.getShards()); + //XXX preserve existing leader + for(Entry edit: slice.getShards().entrySet()) { + if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) { + HashMap newProps = new HashMap(); + newProps.putAll(edit.getValue().getProperties()); + newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP)); + shards.put(edit.getKey(), new ZkNodeProps(newProps)); + } else { + shards.put(edit.getKey(), edit.getValue()); + } + } + final Slice updatedSlice = new Slice(slice.getName(), shards); + slices.put(slice.getName(), updatedSlice); + } + return new CloudState(state.getLiveNodes(), newStates); + } + + private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) { + + boolean updated = false; + final Map> newStates = new LinkedHashMap>(); + newStates.putAll(state.getCollectionStates()); + + final Map slices = newStates.get(collection); + + if(slices==null) { + log.error("Could not mark shard leader for non existing collection."); + return state; + } + + if (!slices.containsKey(sliceName)) { + log.error("Could not mark leader for non existing slice."); + return state; + } else { + final Map newShards = new LinkedHashMap(); + for(Entry shard: slices.get(sliceName).getShards().entrySet()) { + Map newShardProps = new LinkedHashMap(); + newShardProps.putAll(shard.getValue().getProperties()); + + String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag + + ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps)); + if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) { + newShardProps.put(ZkStateReader.LEADER_PROP,"true"); + if (wasLeader == null) { + updated = true; + } + } else { + if (wasLeader != null) { + updated = true; + } + } + newShards.put(shard.getKey(), new ZkNodeProps(newShardProps)); + } + Slice slice = new Slice(sliceName, newShards); + slices.put(sliceName, slice); + } + if (updated) { + return new CloudState(state.getLiveNodes(), newStates); + } else { + return state; + } + } + + } + + public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException { + log.info("Constructing new Overseer id=" + id); this.zkClient = zkClient; this.zkCmdExecutor = new ZkCmdExecutor(); - this.reader = reader; createWatches(); + + //launch cluster state updater thread + ThreadGroup tg = new ThreadGroup("Overseer delayed state updater"); + Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id)); + updaterThread.setDaemon(true); + updaterThread.start(); } public synchronized void createWatches() @@ -267,41 +498,6 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { } } - /** - * Try to assign core to the cluster - * @throws KeeperException - * @throws InterruptedException - */ - private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException { - String collection = coreState.getCollectionName(); - String zkCoreNodeName = coreState.getCoreNodeName(); - - String shardId; - if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) { - shardId = AssignShard.assignShard(collection, state); - } else { - shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP); - } - - Map props = new HashMap(); - for (Entry entry : coreState.getProperties().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - ZkNodeProps zkProps = new ZkNodeProps(props); - Slice slice = state.getSlice(collection, shardId); - Map shardProps; - if (slice == null) { - shardProps = new HashMap(); - } else { - shardProps = state.getSlice(collection, shardId).getShardsCopy(); - } - shardProps.put(zkCoreNodeName, zkProps); - - slice = new Slice(shardId, shardProps); - CloudState newCloudState = updateSlice(state, collection, slice); - return newCloudState; - } - private Set complement(Collection next, Collection prev) { Set downCollections = new HashSet(); @@ -311,23 +507,11 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { } @Override - public void coreChanged(final String nodeName, final Set states) throws KeeperException, InterruptedException { - log.debug("Cores changed: " + nodeName + " states:" + states); - synchronized(reader.getUpdateLock()) { - reader.updateCloudState(true); - CloudState cloudState = reader.getCloudState(); - for (CoreState state : states) { - cloudState = updateState(cloudState, nodeName, state); - } - - try { - zkClient.setData(ZkStateReader.CLUSTER_STATE, - ZkStateReader.toJSON(cloudState), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Interrupted while publishing new state", e); - } + public void coreChanged(final String nodeName, final Set states) + throws KeeperException, InterruptedException { + log.info("Core change pooled: " + nodeName + " states:" + states); + for (CoreState state : states) { + fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state)); } } @@ -340,111 +524,11 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener { ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(); zkCmdExecutor.ensureExists(node, zkClient); } - - private CloudState updateSlice(CloudState state, String collection, Slice slice) { - - final Map> newStates = new LinkedHashMap>(); - newStates.putAll(state.getCollectionStates()); - - if (!newStates.containsKey(collection)) { - newStates.put(collection, new LinkedHashMap()); - } - - final Map slices = newStates.get(collection); - if (!slices.containsKey(slice.getName())) { - slices.put(slice.getName(), slice); - } else { - final Map shards = new LinkedHashMap(); - final Slice existingSlice = slices.get(slice.getName()); - shards.putAll(existingSlice.getShards()); - //XXX preserve existing leader - for(Entry edit: slice.getShards().entrySet()) { - if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) { - HashMap newProps = new HashMap(); - newProps.putAll(edit.getValue().getProperties()); - newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP)); - shards.put(edit.getKey(), new ZkNodeProps(newProps)); - } else { - shards.put(edit.getKey(), edit.getValue()); - } - } - final Slice updatedSlice = new Slice(slice.getName(), shards); - slices.put(slice.getName(), updatedSlice); - } - return new CloudState(state.getLiveNodes(), newStates); - } - - private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) { - - boolean updated = false; - final Map> newStates = new LinkedHashMap>(); - newStates.putAll(state.getCollectionStates()); - - final Map slices = newStates.get(collection); - - if(slices==null) { - log.error("Could not mark shard leader for non existing collection."); - return state; - } - - if (!slices.containsKey(sliceName)) { - log.error("Could not mark leader for non existing slice."); - return state; - } else { - final Map newShards = new LinkedHashMap(); - for(Entry shard: slices.get(sliceName).getShards().entrySet()) { - Map newShardProps = new LinkedHashMap(); - newShardProps.putAll(shard.getValue().getProperties()); - - String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag - - ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps)); - if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) { - newShardProps.put(ZkStateReader.LEADER_PROP,"true"); - if (wasLeader == null) { - updated = true; - } - } else { - if (wasLeader != null) { - updated = true; - } - } - newShards.put(shard.getKey(), new ZkNodeProps(newShardProps)); - } - Slice slice = new Slice(sliceName, newShards); - slices.put(sliceName, slice); - } - if (updated) { - return new CloudState(state.getLiveNodes(), newStates); - } else { - return state; - } - } @Override public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) { - synchronized (reader.getUpdateLock()) { - try { - reader.updateCloudState(true); // get fresh copy of the state - final CloudState state = reader.getCloudState(); - final CloudState newState = setShardLeader(state, collection, shardId, - props.getCoreUrl()); - if (state != newState) { // if same instance was returned no need to - // update state - log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props); - zkClient.setData(ZkStateReader.CLUSTER_STATE, - ZkStateReader.toJSON(newState), true); - - } else { - log.debug("State was not changed."); - } - } catch (KeeperException e) { - log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass()); - } - } + log.info("Leader change pooled."); + fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl())); } } \ No newline at end of file