From e78002bdc165188e219171f81a7a38cda592b5b7 Mon Sep 17 00:00:00 2001 From: markrmiller Date: Mon, 8 Feb 2016 23:24:43 -0500 Subject: [PATCH] SOLR-7281: Add an overseer action to publish an entire node as 'down'. --- solr/CHANGES.txt | 2 + .../apache/solr/cloud/ElectionContext.java | 16 ++-- .../java/org/apache/solr/cloud/Overseer.java | 50 ++++++----- .../org/apache/solr/cloud/ZkController.java | 50 ++++++----- .../solr/cloud/overseer/NodeMutator.java | 85 +++++++++++++++++++ .../solr/cloud/overseer/OverseerAction.java | 3 +- .../org/apache/solr/core/CoreContainer.java | 2 +- 7 files changed, 155 insertions(+), 53 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ff4acd1afd9..4767aa5ba8a 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -488,6 +488,8 @@ Optimizations * SOLR-8615: Just like creating cores, we should use multiple threads when closing cores. (Mark Miller) +* SOLR-7281: Add an overseer action to publish an entire node as 'down'. (Mark Miller, shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 30db6f192e2..10ac105f5ef 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -485,13 +485,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } else { try (SolrCore core = cc.getCore(coreName)) { - final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, - core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); - if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) { - log.warn("The previous leader marked me " + core.getName() - + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader."); - - throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership"); + if (core != null) { + final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, + core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); + if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) { + log.warn("The previous leader marked me " + core.getName() + + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader."); + + throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership"); + } } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 53611c09176..3fe2e5c3c21 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -37,6 +38,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.CollectionMutator; +import org.apache.solr.cloud.overseer.NodeMutator; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.ReplicaMutator; import org.apache.solr.cloud.overseer.SliceMutator; @@ -270,10 +272,10 @@ public class Overseer implements Closeable { private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception { final String operation = message.getStr(QUEUE_OPERATION); - ZkWriteCommand zkWriteCommand = null; + List zkWriteCommands = null; final TimerContext timerContext = stats.time(operation); try { - zkWriteCommand = processMessage(clusterState, message, operation); + zkWriteCommands = processMessage(clusterState, message, operation); stats.success(operation); } catch (Exception e) { // generally there is nothing we can do - in most cases, we have @@ -286,8 +288,10 @@ public class Overseer implements Closeable { } finally { timerContext.stop(); } - if (zkWriteCommand != null) { - clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback); + if (zkWriteCommands != null) { + for (ZkWriteCommand zkWriteCommand : zkWriteCommands) { + clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback); + } if (!enableBatching) { clusterState = zkStateWriter.writePendingUpdates(); } @@ -334,37 +338,37 @@ public class Overseer implements Closeable { } } - private ZkWriteCommand processMessage(ClusterState clusterState, + private List processMessage(ClusterState clusterState, final ZkNodeProps message, final String operation) { CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation); if (collectionAction != null) { switch (collectionAction) { case CREATE: - return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message); + return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message)); case DELETE: - return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message); + return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message)); case CREATESHARD: - return new CollectionMutator(getZkStateReader()).createShard(clusterState, message); + return Collections.singletonList(new CollectionMutator(getZkStateReader()).createShard(clusterState, message)); case DELETESHARD: - return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message); + return Collections.singletonList(new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message)); case ADDREPLICA: - return new SliceMutator(getZkStateReader()).addReplica(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).addReplica(clusterState, message)); case ADDREPLICAPROP: - return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message); + return Collections.singletonList(new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message)); case DELETEREPLICAPROP: - return new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message); + return Collections.singletonList(new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message)); case BALANCESHARDUNIQUE: ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message); if (dProp.balanceProperty()) { String collName = message.getStr(ZkStateReader.COLLECTION_PROP); - return new ZkWriteCommand(collName, dProp.getDocCollection()); + return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection())); } break; case MODIFYCOLLECTION: CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties()); - return new CollectionMutator(reader).modifyCollection(clusterState,message); + return Collections.singletonList(new CollectionMutator(reader).modifyCollection(clusterState,message)); case MIGRATESTATEFORMAT: - return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message); + return Collections.singletonList(new ClusterStateMutator(reader).migrateStateFormat(clusterState, message)); default: throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties()); @@ -376,17 +380,17 @@ public class Overseer implements Closeable { } switch (overseerAction) { case STATE: - return new ReplicaMutator(getZkStateReader()).setState(clusterState, message); + return Collections.singletonList(new ReplicaMutator(getZkStateReader()).setState(clusterState, message)); case LEADER: - return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message)); case DELETECORE: - return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).removeReplica(clusterState, message)); case ADDROUTINGRULE: - return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message)); case REMOVEROUTINGRULE: - return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message)); case UPDATESHARDSTATE: - return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message); + return Collections.singletonList(new SliceMutator(getZkStateReader()).updateShardState(clusterState, message)); case QUIT: if (myId.equals(message.get("id"))) { log.info("Quit command received {}", LeaderElector.getNodeName(myId)); @@ -396,12 +400,14 @@ public class Overseer implements Closeable { log.warn("Overseer received wrong QUIT message {}", message); } break; + case DOWNNODE: + return new NodeMutator(getZkStateReader()).downNode(clusterState, message); default: throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties()); } } - return ZkStateWriter.NO_OP; + return Collections.singletonList(ZkStateWriter.NO_OP); } private LeaderStatus amILeader() { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 44af3f18ee0..bfd9e7608dd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -387,28 +387,7 @@ public final class ZkController { if (descriptors != null) { // before registering as live, make sure everyone is in a // down state - for (CoreDescriptor descriptor : descriptors) { - try { - descriptor.getCloudDescriptor().setLeader(false); - publish(descriptor, Replica.State.DOWN, updateLastPublished); - } catch (Exception e) { - if (isClosed) { - return; - } - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - try { - publish(descriptor, Replica.State.DOWN); - } catch (Exception e2) { - SolrException.log(log, "", e2); - continue; - } - } - } - + publishNodeAsDown(getNodeName()); for (CoreDescriptor descriptor : descriptors) { // if it looks like we are going to be the leader, we don't // want to wait for the following stuff @@ -2504,4 +2483,31 @@ public final class ZkController { } return false; } + + + /** + * Best effort to set DOWN state for all replicas on node. + * + * @param nodeName to operate on + */ + public void publishNodeAsDown(String nodeName) { + log.info("Publish node={} as DOWN", nodeName); + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(), + ZkStateReader.NODE_NAME_PROP, nodeName); + try { + Overseer.getInQueue(getZkClient()).offer(Utils.toJSON(m)); + } catch (KeeperException e) { + log.info("Could not publish node as down: " + e.getMessage()); + } catch (RuntimeException e) { + Throwable rootCause = SolrException.getRootCause(e); + if (rootCause instanceof KeeperException) { + log.info("Could not publish node as down: " + e.getMessage()); + } else { + throw e; + } + } catch (InterruptedException e) { + Thread.interrupted(); + log.info("", e); + } + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java new file mode 100644 index 00000000000..0784cd4e2f1 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.overseer; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NodeMutator { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public NodeMutator(ZkStateReader zkStateReader) { + + } + + public List downNode(ClusterState clusterState, ZkNodeProps message) { + List zkWriteCommands = new ArrayList(); + String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP); + + log.info("DownNode state invoked for node: " + nodeName); + + Set collections = clusterState.getCollections(); + for (String collection : collections) { + + Map slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection)); + + Set> entries = slicesCopy.entrySet(); + for (Entry entry : entries) { + Slice slice = clusterState.getSlice(collection, entry.getKey()); + Map newReplicas = new HashMap(); + + Collection replicas = slice.getReplicas(); + for (Replica replica : replicas) { + Map props = replica.shallowCopy(); + String rNodeName = replica.getNodeName(); + if (rNodeName.equals(nodeName)) { + log.info("Update replica state for " + replica + " to " + Replica.State.DOWN.toString()); + props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); + } + + Replica newReplica = new Replica(replica.getName(), props); + newReplicas.put(replica.getName(), newReplica); + } + + Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy()); + slicesCopy.put(slice.getName(), newSlice); + + } + + zkWriteCommands.add(new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy))); + } + + return zkWriteCommands; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java index ad766a3473e..ea0080690b2 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java @@ -33,7 +33,8 @@ public enum OverseerAction { REMOVEROUTINGRULE, UPDATESHARDSTATE, STATE, - QUIT; + QUIT, + DOWNNODE; public static OverseerAction get(String p) { if (p != null) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index a14ba866919..426a493a2b1 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -554,7 +554,7 @@ public class CoreContainer { if (isZooKeeperAware()) { cancelCoreRecoveries(); - zkSys.publishCoresAsDown(solrCores.getCores()); + zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName()); } try {