mirror of https://github.com/apache/lucene.git
SOLR-7281: Add an overseer action to publish an entire node as 'down'.
This commit is contained in:
parent
6af3d8ffd6
commit
2e65a90eba
|
@ -267,6 +267,8 @@ Optimizations
|
||||||
* SOLR-8615: Just like creating cores, we should use multiple threads when closing cores.
|
* SOLR-8615: Just like creating cores, we should use multiple threads when closing cores.
|
||||||
(Mark Miller)
|
(Mark Miller)
|
||||||
|
|
||||||
|
* SOLR-7281: Add an overseer action to publish an entire node as 'down'. (Mark Miller, shalin)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -486,6 +486,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
try (SolrCore core = cc.getCore(coreName)) {
|
try (SolrCore core = cc.getCore(coreName)) {
|
||||||
|
if (core != null) {
|
||||||
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
|
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
|
||||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||||
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
|
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
|
||||||
|
@ -497,6 +498,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
|
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
|
||||||
try (SolrCore core = cc.getCore(coreName)) {
|
try (SolrCore core = cc.getCore(coreName)) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.solr.client.solrj.SolrResponse;
|
import org.apache.solr.client.solrj.SolrResponse;
|
||||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||||
|
import org.apache.solr.cloud.overseer.NodeMutator;
|
||||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||||
import org.apache.solr.cloud.overseer.ReplicaMutator;
|
import org.apache.solr.cloud.overseer.ReplicaMutator;
|
||||||
import org.apache.solr.cloud.overseer.SliceMutator;
|
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||||
|
@ -270,10 +272,10 @@ public class Overseer implements Closeable {
|
||||||
|
|
||||||
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
|
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
|
||||||
final String operation = message.getStr(QUEUE_OPERATION);
|
final String operation = message.getStr(QUEUE_OPERATION);
|
||||||
ZkWriteCommand zkWriteCommand = null;
|
List<ZkWriteCommand> zkWriteCommands = null;
|
||||||
final TimerContext timerContext = stats.time(operation);
|
final TimerContext timerContext = stats.time(operation);
|
||||||
try {
|
try {
|
||||||
zkWriteCommand = processMessage(clusterState, message, operation);
|
zkWriteCommands = processMessage(clusterState, message, operation);
|
||||||
stats.success(operation);
|
stats.success(operation);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// generally there is nothing we can do - in most cases, we have
|
// generally there is nothing we can do - in most cases, we have
|
||||||
|
@ -286,8 +288,10 @@ public class Overseer implements Closeable {
|
||||||
} finally {
|
} finally {
|
||||||
timerContext.stop();
|
timerContext.stop();
|
||||||
}
|
}
|
||||||
if (zkWriteCommand != null) {
|
if (zkWriteCommands != null) {
|
||||||
|
for (ZkWriteCommand zkWriteCommand : zkWriteCommands) {
|
||||||
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
|
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
|
||||||
|
}
|
||||||
if (!enableBatching) {
|
if (!enableBatching) {
|
||||||
clusterState = zkStateWriter.writePendingUpdates();
|
clusterState = zkStateWriter.writePendingUpdates();
|
||||||
}
|
}
|
||||||
|
@ -334,37 +338,37 @@ public class Overseer implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ZkWriteCommand processMessage(ClusterState clusterState,
|
private List<ZkWriteCommand> processMessage(ClusterState clusterState,
|
||||||
final ZkNodeProps message, final String operation) {
|
final ZkNodeProps message, final String operation) {
|
||||||
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
|
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
|
||||||
if (collectionAction != null) {
|
if (collectionAction != null) {
|
||||||
switch (collectionAction) {
|
switch (collectionAction) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
|
return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message));
|
||||||
case DELETE:
|
case DELETE:
|
||||||
return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
|
return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message));
|
||||||
case CREATESHARD:
|
case CREATESHARD:
|
||||||
return new CollectionMutator(getZkStateReader()).createShard(clusterState, message);
|
return Collections.singletonList(new CollectionMutator(getZkStateReader()).createShard(clusterState, message));
|
||||||
case DELETESHARD:
|
case DELETESHARD:
|
||||||
return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
|
return Collections.singletonList(new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message));
|
||||||
case ADDREPLICA:
|
case ADDREPLICA:
|
||||||
return new SliceMutator(getZkStateReader()).addReplica(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).addReplica(clusterState, message));
|
||||||
case ADDREPLICAPROP:
|
case ADDREPLICAPROP:
|
||||||
return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message);
|
return Collections.singletonList(new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message));
|
||||||
case DELETEREPLICAPROP:
|
case DELETEREPLICAPROP:
|
||||||
return new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message);
|
return Collections.singletonList(new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message));
|
||||||
case BALANCESHARDUNIQUE:
|
case BALANCESHARDUNIQUE:
|
||||||
ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
|
ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
|
||||||
if (dProp.balanceProperty()) {
|
if (dProp.balanceProperty()) {
|
||||||
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
return new ZkWriteCommand(collName, dProp.getDocCollection());
|
return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection()));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case MODIFYCOLLECTION:
|
case MODIFYCOLLECTION:
|
||||||
CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
|
CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
|
||||||
return new CollectionMutator(reader).modifyCollection(clusterState,message);
|
return Collections.singletonList(new CollectionMutator(reader).modifyCollection(clusterState,message));
|
||||||
case MIGRATESTATEFORMAT:
|
case MIGRATESTATEFORMAT:
|
||||||
return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message);
|
return Collections.singletonList(new ClusterStateMutator(reader).migrateStateFormat(clusterState, message));
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("unknown operation:" + operation
|
throw new RuntimeException("unknown operation:" + operation
|
||||||
+ " contents:" + message.getProperties());
|
+ " contents:" + message.getProperties());
|
||||||
|
@ -376,17 +380,17 @@ public class Overseer implements Closeable {
|
||||||
}
|
}
|
||||||
switch (overseerAction) {
|
switch (overseerAction) {
|
||||||
case STATE:
|
case STATE:
|
||||||
return new ReplicaMutator(getZkStateReader()).setState(clusterState, message);
|
return Collections.singletonList(new ReplicaMutator(getZkStateReader()).setState(clusterState, message));
|
||||||
case LEADER:
|
case LEADER:
|
||||||
return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message));
|
||||||
case DELETECORE:
|
case DELETECORE:
|
||||||
return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).removeReplica(clusterState, message));
|
||||||
case ADDROUTINGRULE:
|
case ADDROUTINGRULE:
|
||||||
return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message));
|
||||||
case REMOVEROUTINGRULE:
|
case REMOVEROUTINGRULE:
|
||||||
return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message));
|
||||||
case UPDATESHARDSTATE:
|
case UPDATESHARDSTATE:
|
||||||
return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message);
|
return Collections.singletonList(new SliceMutator(getZkStateReader()).updateShardState(clusterState, message));
|
||||||
case QUIT:
|
case QUIT:
|
||||||
if (myId.equals(message.get("id"))) {
|
if (myId.equals(message.get("id"))) {
|
||||||
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
|
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
|
||||||
|
@ -396,12 +400,14 @@ public class Overseer implements Closeable {
|
||||||
log.warn("Overseer received wrong QUIT message {}", message);
|
log.warn("Overseer received wrong QUIT message {}", message);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case DOWNNODE:
|
||||||
|
return new NodeMutator(getZkStateReader()).downNode(clusterState, message);
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
|
throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ZkStateWriter.NO_OP;
|
return Collections.singletonList(ZkStateWriter.NO_OP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private LeaderStatus amILeader() {
|
private LeaderStatus amILeader() {
|
||||||
|
|
|
@ -387,28 +387,7 @@ public final class ZkController {
|
||||||
if (descriptors != null) {
|
if (descriptors != null) {
|
||||||
// before registering as live, make sure everyone is in a
|
// before registering as live, make sure everyone is in a
|
||||||
// down state
|
// down state
|
||||||
for (CoreDescriptor descriptor : descriptors) {
|
publishNodeAsDown(getNodeName());
|
||||||
try {
|
|
||||||
descriptor.getCloudDescriptor().setLeader(false);
|
|
||||||
publish(descriptor, Replica.State.DOWN, updateLastPublished);
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (isClosed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
publish(descriptor, Replica.State.DOWN);
|
|
||||||
} catch (Exception e2) {
|
|
||||||
SolrException.log(log, "", e2);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (CoreDescriptor descriptor : descriptors) {
|
for (CoreDescriptor descriptor : descriptors) {
|
||||||
// if it looks like we are going to be the leader, we don't
|
// if it looks like we are going to be the leader, we don't
|
||||||
// want to wait for the following stuff
|
// want to wait for the following stuff
|
||||||
|
@ -2519,4 +2498,31 @@ public final class ZkController {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Best effort to set DOWN state for all replicas on node.
|
||||||
|
*
|
||||||
|
* @param nodeName to operate on
|
||||||
|
*/
|
||||||
|
public void publishNodeAsDown(String nodeName) {
|
||||||
|
log.info("Publish node={} as DOWN", nodeName);
|
||||||
|
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
|
||||||
|
ZkStateReader.NODE_NAME_PROP, nodeName);
|
||||||
|
try {
|
||||||
|
Overseer.getInQueue(getZkClient()).offer(Utils.toJSON(m));
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
log.info("Could not publish node as down: " + e.getMessage());
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
Throwable rootCause = SolrException.getRootCause(e);
|
||||||
|
if (rootCause instanceof KeeperException) {
|
||||||
|
log.info("Could not publish node as down: " + e.getMessage());
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.interrupted();
|
||||||
|
log.info("", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.solr.cloud.overseer;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class NodeMutator {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
public NodeMutator(ZkStateReader zkStateReader) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
|
||||||
|
List<ZkWriteCommand> zkWriteCommands = new ArrayList<ZkWriteCommand>();
|
||||||
|
String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
|
||||||
|
|
||||||
|
log.info("DownNode state invoked for node: " + nodeName);
|
||||||
|
|
||||||
|
Set<String> collections = clusterState.getCollections();
|
||||||
|
for (String collection : collections) {
|
||||||
|
|
||||||
|
Map<String,Slice> slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection));
|
||||||
|
|
||||||
|
Set<Entry<String,Slice>> entries = slicesCopy.entrySet();
|
||||||
|
for (Entry<String,Slice> entry : entries) {
|
||||||
|
Slice slice = clusterState.getSlice(collection, entry.getKey());
|
||||||
|
Map<String,Replica> newReplicas = new HashMap<String,Replica>();
|
||||||
|
|
||||||
|
Collection<Replica> replicas = slice.getReplicas();
|
||||||
|
for (Replica replica : replicas) {
|
||||||
|
Map<String,Object> props = replica.shallowCopy();
|
||||||
|
String rNodeName = replica.getNodeName();
|
||||||
|
if (rNodeName.equals(nodeName)) {
|
||||||
|
log.info("Update replica state for " + replica + " to " + Replica.State.DOWN.toString());
|
||||||
|
props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
Replica newReplica = new Replica(replica.getName(), props);
|
||||||
|
newReplicas.put(replica.getName(), newReplica);
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy());
|
||||||
|
slicesCopy.put(slice.getName(), newSlice);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
zkWriteCommands.add(new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return zkWriteCommands;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -33,7 +33,8 @@ public enum OverseerAction {
|
||||||
REMOVEROUTINGRULE,
|
REMOVEROUTINGRULE,
|
||||||
UPDATESHARDSTATE,
|
UPDATESHARDSTATE,
|
||||||
STATE,
|
STATE,
|
||||||
QUIT;
|
QUIT,
|
||||||
|
DOWNNODE;
|
||||||
|
|
||||||
public static OverseerAction get(String p) {
|
public static OverseerAction get(String p) {
|
||||||
if (p != null) {
|
if (p != null) {
|
||||||
|
|
|
@ -554,7 +554,7 @@ public class CoreContainer {
|
||||||
|
|
||||||
if (isZooKeeperAware()) {
|
if (isZooKeeperAware()) {
|
||||||
cancelCoreRecoveries();
|
cancelCoreRecoveries();
|
||||||
zkSys.publishCoresAsDown(solrCores.getCores());
|
zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName());
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue