SOLR-3065: Let overseer process cluster state changes asynchronously

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1237194 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-01-29 02:22:55 +00:00
parent 9a1222efc5
commit 210301b424
2 changed files with 261 additions and 160 deletions

View File

@ -209,14 +209,31 @@ final class OverseerElectionContext extends ElectionContext {
private final ZkStateReader stateReader;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
super(zkNodeName, "/overseer_elect", null, null);
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
this.zkClient = zkClient;
this.stateReader = stateReader;
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
new Overseer(zkClient, stateReader);
final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
ZkNodeProps myProps = new ZkNodeProps("id", id);
try {
zkClient.makePath(leaderPath,
ZkStateReader.toJSON(myProps),
CreateMode.EPHEMERAL, true);
} catch (NodeExistsException e) {
// if a previous leader ephemeral still exists for some reason, try and
// remove it
zkClient.delete(leaderPath, -1, true);
zkClient.makePath(leaderPath,
ZkStateReader.toJSON(myProps),
CreateMode.EPHEMERAL, true);
}
new Overseer(zkClient, stateReader, id);
}
}

View File

@ -22,9 +22,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Set;
import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory;
* Cluster leader. Responsible node assignments, cluster state file?
*/
public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates
static enum Op {
LeaderChange, StateChange;
}
private final class CloudStateUpdateRequest {
final Op operation;
final Object[] args;
CloudStateUpdateRequest(final Op operation, final Object... args) {
this.operation = operation;
this.args = args;
}
}
public static final String ASSIGNMENTS_NODE = "/node_assignments";
public static final String STATES_NODE = "/node_states";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private final SolrZkClient zkClient;
private final ZkStateReader reader;
// pooled updates
private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
// node stateWatches
private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
@ -66,12 +87,222 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
private ZkCmdExecutor zkCmdExecutor;
public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
log.info("Constructing new Overseer");
private static class CloudStateUpdater implements Runnable {
private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
private final ZkStateReader reader;
private final SolrZkClient zkClient;
private final String myId;
public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
this.fifo = fifo;
this.myId = myId;
this.reader = reader;
this.zkClient = zkClient;
}
@Override
public void run() {
while (amILeader()) {
LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
while (!fifo.isEmpty()) {
// collect all queued requests
CloudStateUpdateRequest req;
req = fifo.poll();
if (req == null) {
break;
}
requests.add(req);
}
if (requests.size() > 0) {
// process updates
synchronized (reader.getUpdateLock()) {
try {
reader.updateCloudState(true);
CloudState cloudState = reader.getCloudState();
for (CloudStateUpdateRequest request : requests) {
switch (request.operation) {
case LeaderChange:
cloudState = setShardLeader(cloudState,
(String) request.args[0], (String) request.args[1],
(String) request.args[2]);
break;
case StateChange:
cloudState = updateState(cloudState,
(String) request.args[0], (CoreState) request.args[1]);
break;
}
}
log.info("Announcing new cluster state");
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(cloudState), true);
} catch (KeeperException e) {
// XXX stop processing, exit
return;
} catch (InterruptedException e) {
// XXX stop processing, exit
return;
}
}
}
try {
Thread.sleep(STATE_UPDATE_DELAY);
} catch (InterruptedException e) {
//
}
}
}
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false));
if(myId.equals(props.get("id"))) {
return true;
}
} catch (KeeperException e) {
// assume we're dead
} catch (InterruptedException e) {
// assume we're dead
}
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return false;
}
/**
* Try to assign core to the cluster
* @throws KeeperException
* @throws InterruptedException
*/
private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
String collection = coreState.getCollectionName();
String zkCoreNodeName = coreState.getCoreNodeName();
String shardId;
if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
shardId = AssignShard.assignShard(collection, state);
} else {
shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
}
Map<String,String> props = new HashMap<String,String>();
for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
ZkNodeProps zkProps = new ZkNodeProps(props);
Slice slice = state.getSlice(collection, shardId);
Map<String,ZkNodeProps> shardProps;
if (slice == null) {
shardProps = new HashMap<String,ZkNodeProps>();
} else {
shardProps = state.getSlice(collection, shardId).getShardsCopy();
}
shardProps.put(zkCoreNodeName, zkProps);
slice = new Slice(shardId, shardProps);
CloudState newCloudState = updateSlice(state, collection, slice);
return newCloudState;
}
private CloudState updateSlice(CloudState state, String collection, Slice slice) {
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
if (!newStates.containsKey(collection)) {
newStates.put(collection, new LinkedHashMap<String,Slice>());
}
final Map<String, Slice> slices = newStates.get(collection);
if (!slices.containsKey(slice.getName())) {
slices.put(slice.getName(), slice);
} else {
final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
final Slice existingSlice = slices.get(slice.getName());
shards.putAll(existingSlice.getShards());
//XXX preserve existing leader
for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
HashMap<String, String> newProps = new HashMap<String,String>();
newProps.putAll(edit.getValue().getProperties());
newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
shards.put(edit.getKey(), new ZkNodeProps(newProps));
} else {
shards.put(edit.getKey(), edit.getValue());
}
}
final Slice updatedSlice = new Slice(slice.getName(), shards);
slices.put(slice.getName(), updatedSlice);
}
return new CloudState(state.getLiveNodes(), newStates);
}
private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
boolean updated = false;
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
final Map<String, Slice> slices = newStates.get(collection);
if(slices==null) {
log.error("Could not mark shard leader for non existing collection.");
return state;
}
if (!slices.containsKey(sliceName)) {
log.error("Could not mark leader for non existing slice.");
return state;
} else {
final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
Map<String, String> newShardProps = new LinkedHashMap<String,String>();
newShardProps.putAll(shard.getValue().getProperties());
String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
newShardProps.put(ZkStateReader.LEADER_PROP,"true");
if (wasLeader == null) {
updated = true;
}
} else {
if (wasLeader != null) {
updated = true;
}
}
newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
}
Slice slice = new Slice(sliceName, newShards);
slices.put(sliceName, slice);
}
if (updated) {
return new CloudState(state.getLiveNodes(), newStates);
} else {
return state;
}
}
}
public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
log.info("Constructing new Overseer id=" + id);
this.zkClient = zkClient;
this.zkCmdExecutor = new ZkCmdExecutor();
this.reader = reader;
createWatches();
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
updaterThread.setDaemon(true);
updaterThread.start();
}
public synchronized void createWatches()
@ -267,41 +498,6 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
}
}
/**
* Try to assign core to the cluster
* @throws KeeperException
* @throws InterruptedException
*/
private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
String collection = coreState.getCollectionName();
String zkCoreNodeName = coreState.getCoreNodeName();
String shardId;
if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
shardId = AssignShard.assignShard(collection, state);
} else {
shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
}
Map<String,String> props = new HashMap<String,String>();
for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
ZkNodeProps zkProps = new ZkNodeProps(props);
Slice slice = state.getSlice(collection, shardId);
Map<String,ZkNodeProps> shardProps;
if (slice == null) {
shardProps = new HashMap<String,ZkNodeProps>();
} else {
shardProps = state.getSlice(collection, shardId).getShardsCopy();
}
shardProps.put(zkCoreNodeName, zkProps);
slice = new Slice(shardId, shardProps);
CloudState newCloudState = updateSlice(state, collection, slice);
return newCloudState;
}
private Set<String> complement(Collection<String> next,
Collection<String> prev) {
Set<String> downCollections = new HashSet<String>();
@ -311,23 +507,11 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
}
@Override
public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException {
log.debug("Cores changed: " + nodeName + " states:" + states);
synchronized(reader.getUpdateLock()) {
reader.updateCloudState(true);
CloudState cloudState = reader.getCloudState();
for (CoreState state : states) {
cloudState = updateState(cloudState, nodeName, state);
}
try {
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(cloudState), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Interrupted while publishing new state", e);
}
public void coreChanged(final String nodeName, final Set<CoreState> states)
throws KeeperException, InterruptedException {
log.info("Core change pooled: " + nodeName + " states:" + states);
for (CoreState state : states) {
fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
}
}
@ -340,111 +524,11 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
zkCmdExecutor.ensureExists(node, zkClient);
}
private CloudState updateSlice(CloudState state, String collection, Slice slice) {
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
if (!newStates.containsKey(collection)) {
newStates.put(collection, new LinkedHashMap<String,Slice>());
}
final Map<String, Slice> slices = newStates.get(collection);
if (!slices.containsKey(slice.getName())) {
slices.put(slice.getName(), slice);
} else {
final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
final Slice existingSlice = slices.get(slice.getName());
shards.putAll(existingSlice.getShards());
//XXX preserve existing leader
for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
HashMap<String, String> newProps = new HashMap<String,String>();
newProps.putAll(edit.getValue().getProperties());
newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
shards.put(edit.getKey(), new ZkNodeProps(newProps));
} else {
shards.put(edit.getKey(), edit.getValue());
}
}
final Slice updatedSlice = new Slice(slice.getName(), shards);
slices.put(slice.getName(), updatedSlice);
}
return new CloudState(state.getLiveNodes(), newStates);
}
private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
boolean updated = false;
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
final Map<String, Slice> slices = newStates.get(collection);
if(slices==null) {
log.error("Could not mark shard leader for non existing collection.");
return state;
}
if (!slices.containsKey(sliceName)) {
log.error("Could not mark leader for non existing slice.");
return state;
} else {
final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
Map<String, String> newShardProps = new LinkedHashMap<String,String>();
newShardProps.putAll(shard.getValue().getProperties());
String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
newShardProps.put(ZkStateReader.LEADER_PROP,"true");
if (wasLeader == null) {
updated = true;
}
} else {
if (wasLeader != null) {
updated = true;
}
}
newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
}
Slice slice = new Slice(sliceName, newShards);
slices.put(sliceName, slice);
}
if (updated) {
return new CloudState(state.getLiveNodes(), newStates);
} else {
return state;
}
}
@Override
public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
synchronized (reader.getUpdateLock()) {
try {
reader.updateCloudState(true); // get fresh copy of the state
final CloudState state = reader.getCloudState();
final CloudState newState = setShardLeader(state, collection, shardId,
props.getCoreUrl());
if (state != newState) { // if same instance was returned no need to
// update state
log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(newState), true);
} else {
log.debug("State was not changed.");
}
} catch (KeeperException e) {
log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
}
}
log.info("Leader change pooled.");
fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl()));
}
}