mirror of https://github.com/apache/lucene.git
SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes
This commit is contained in:
parent
43aed486a9
commit
292644f9a5
|
@ -247,6 +247,8 @@ Other Changes
|
|||
|
||||
* SOLR-9404: Refactor move/renames in JSON FacetProcessor and FacetFieldProcessor. (David Smiley)
|
||||
|
||||
* SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes (noble)
|
||||
|
||||
================== 6.1.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
addReplica(ocmh.zkStateReader.getClusterState(), message, results, null);
|
||||
}
|
||||
|
||||
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
|
||||
throws KeeperException, InterruptedException {
|
||||
log.info("addReplica() : {}", Utils.toJSONString(message));
|
||||
String collection = message.getStr(COLLECTION_PROP);
|
||||
String node = message.getStr(CoreAdminParams.NODE);
|
||||
String shard = message.getStr(SHARD_ID_PROP);
|
||||
String coreName = message.getStr(CoreAdminParams.NAME);
|
||||
boolean parallel = message.getBool("parallel", false);
|
||||
if (StringUtils.isBlank(coreName)) {
|
||||
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
|
||||
}
|
||||
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
if (coll == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
|
||||
}
|
||||
if (coll.getSlice(shard) == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Collection: " + collection + " shard: " + shard + " does not exist");
|
||||
}
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
|
||||
|
||||
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
|
||||
if (!skipCreateReplicaInClusterState) {
|
||||
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
||||
ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
|
||||
}
|
||||
log.info("Node Identified {} for creating new replica", node);
|
||||
|
||||
if (!clusterState.liveNodesContain(node)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
|
||||
}
|
||||
if (coreName == null) {
|
||||
coreName = Assign.buildCoreName(coll, shard);
|
||||
} else if (!skipCreateReplicaInClusterState) {
|
||||
//Validate that the core name is unique in that collection
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String replicaCoreName = replica.getStr(CORE_NAME_PROP);
|
||||
if (coreName.equals(replicaCoreName)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
|
||||
" for this collection");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
if (!Overseer.isLegacy(zkStateReader)) {
|
||||
if (!skipCreateReplicaInClusterState) {
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
|
||||
ZkStateReader.NODE_NAME_PROP, node);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
|
||||
}
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME,
|
||||
ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
|
||||
}
|
||||
|
||||
String configName = zkStateReader.readConfigName(collection);
|
||||
String routeKey = message.getStr(ShardParams._ROUTE_);
|
||||
String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
|
||||
String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
|
||||
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
|
||||
params.set(CoreAdminParams.NAME, coreName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
if (shard != null) {
|
||||
params.set(CoreAdminParams.SHARD, shard);
|
||||
} else if (routeKey != null) {
|
||||
Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
|
||||
if (slices.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
|
||||
} else {
|
||||
params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
|
||||
}
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
|
||||
}
|
||||
if (dataDir != null) {
|
||||
params.set(CoreAdminParams.DATA_DIR, dataDir);
|
||||
}
|
||||
if (instanceDir != null) {
|
||||
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
|
||||
}
|
||||
ocmh.addPropertyParams(message, params);
|
||||
|
||||
// For tracking async calls.
|
||||
Map<String,String> requestMap = new HashMap<>();
|
||||
ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
|
||||
|
||||
final String fnode = node;
|
||||
final String fcoreName = coreName;
|
||||
|
||||
Runnable runnable = () -> {
|
||||
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
|
||||
ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
|
||||
if (onComplete != null) onComplete.run();
|
||||
};
|
||||
|
||||
if (!parallel) {
|
||||
runnable.run();
|
||||
} else {
|
||||
ocmh.tpe.submit(runnable);
|
||||
}
|
||||
|
||||
|
||||
return new ZkNodeProps(
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.NODE_NAME_PROP, node
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.backup.BackupManager;
|
||||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public BackupCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String backupName = message.getStr(NAME);
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
|
||||
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
Instant startTime = Instant.now();
|
||||
|
||||
CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
|
||||
BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
|
||||
BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader, collectionName);
|
||||
|
||||
// Backup location
|
||||
URI backupPath = repository.createURI(location, backupName);
|
||||
|
||||
//Validating if the directory already exists.
|
||||
if (repository.exists(backupPath)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
|
||||
}
|
||||
|
||||
// Create a directory to store backup details.
|
||||
repository.createDirectory(backupPath);
|
||||
|
||||
log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
|
||||
backupPath);
|
||||
|
||||
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
|
||||
Replica replica = slice.getLeader();
|
||||
|
||||
String coreName = replica.getStr(CORE_NAME_PROP);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString());
|
||||
params.set(NAME, slice.getName());
|
||||
params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
|
||||
params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath()); // note: index dir will be here then the "snapshot." + slice name
|
||||
params.set(CORE_NAME_PROP, coreName);
|
||||
|
||||
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
|
||||
}
|
||||
log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
|
||||
|
||||
log.info("Starting to backup ZK data for backupName={}", backupName);
|
||||
|
||||
//Download the configs
|
||||
String configName = ocmh.zkStateReader.readConfigName(collectionName);
|
||||
backupMgr.downloadConfigDir(location, backupName, configName);
|
||||
|
||||
//Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
|
||||
//Since we don't want to distinguish we extract the state and back it up as a separate json
|
||||
DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
|
||||
backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
|
||||
|
||||
Properties properties = new Properties();
|
||||
|
||||
properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
|
||||
properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
|
||||
properties.put(COLL_CONF, configName);
|
||||
properties.put(BackupManager.START_TIME_PROP, startTime.toString());
|
||||
//TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
|
||||
//if they are not the same then we can throw an error or have an 'overwriteConfig' flag
|
||||
//TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
|
||||
|
||||
backupMgr.writeBackupProperties(location, backupName, properties);
|
||||
|
||||
log.info("Completed backing up ZK data for backupName={}", backupName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
|
||||
public class CreateAliasCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results)
|
||||
throws Exception {
|
||||
String aliasName = message.getStr(NAME);
|
||||
String collections = message.getStr("collections");
|
||||
|
||||
Map<String, Map<String, String>> newAliasesMap = new HashMap<>();
|
||||
Map<String, String> newCollectionAliasesMap = new HashMap<>();
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
Map<String, String> prevColAliases = zkStateReader.getAliases().getCollectionAliasMap();
|
||||
if (prevColAliases != null) {
|
||||
newCollectionAliasesMap.putAll(prevColAliases);
|
||||
}
|
||||
newCollectionAliasesMap.put(aliasName, collections);
|
||||
newAliasesMap.put("collection", newCollectionAliasesMap);
|
||||
Aliases newAliases = new Aliases(newAliasesMap);
|
||||
byte[] jsonBytes = null;
|
||||
if (newAliases.collectionAliasSize() > 0) { // only sub map right now
|
||||
jsonBytes = Utils.toJSON(newAliases.getAliasMap());
|
||||
}
|
||||
try {
|
||||
zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
|
||||
|
||||
checkForAlias(aliasName, collections);
|
||||
// some fudge for other nodes
|
||||
Thread.sleep(100);
|
||||
} catch (KeeperException e) {
|
||||
log.error("", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkForAlias(String name, String value) {
|
||||
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean success = false;
|
||||
Aliases aliases;
|
||||
while (!timeout.hasTimedOut()) {
|
||||
aliases = ocmh.zkStateReader.getAliases();
|
||||
String collections = aliases.getCollectionAlias(name);
|
||||
if (collections != null && collections.equals(value)) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
log.warn("Timeout waiting to be notified of Alias change...");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,291 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.common.util.StrUtils.formatString;
|
||||
|
||||
public class CreateCollectionCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
final String collectionName = message.getStr(NAME);
|
||||
log.info("Create collection {}", collectionName);
|
||||
if (clusterState.hasCollection(collectionName)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
|
||||
}
|
||||
|
||||
String configName = getConfigName(collectionName, message);
|
||||
if (configName == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
|
||||
}
|
||||
|
||||
ocmh.validateConfigOrThrowSolrException(configName);
|
||||
|
||||
|
||||
try {
|
||||
// look at the replication factor and see if it matches reality
|
||||
// if it does not, find best nodes to create more cores
|
||||
|
||||
int repFactor = message.getInt(REPLICATION_FACTOR, 1);
|
||||
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
final String async = message.getStr(ASYNC);
|
||||
|
||||
Integer numSlices = message.getInt(NUM_SLICES, null);
|
||||
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
|
||||
List<String> shardNames = new ArrayList<>();
|
||||
if(ImplicitDocRouter.NAME.equals(router)){
|
||||
ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
|
||||
numSlices = shardNames.size();
|
||||
} else {
|
||||
if (numSlices == null ) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
|
||||
}
|
||||
ClusterStateMutator.getShardNames(numSlices, shardNames);
|
||||
}
|
||||
|
||||
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
|
||||
|
||||
if (repFactor <= 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
|
||||
}
|
||||
|
||||
if (numSlices <= 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
|
||||
}
|
||||
|
||||
// we need to look at every node and see how many cores it serves
|
||||
// add our new cores to existing nodes serving the least number of cores
|
||||
// but (for now) require that each core goes on a distinct node.
|
||||
|
||||
final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
|
||||
Map<ReplicaAssigner.Position, String> positionVsNodes;
|
||||
if (nodeList.isEmpty()) {
|
||||
log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
|
||||
|
||||
positionVsNodes = new HashMap<>();
|
||||
} else {
|
||||
if (repFactor > nodeList.size()) {
|
||||
log.warn("Specified "
|
||||
+ REPLICATION_FACTOR
|
||||
+ " of "
|
||||
+ repFactor
|
||||
+ " on collection "
|
||||
+ collectionName
|
||||
+ " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
|
||||
+ nodeList.size()
|
||||
+ "). It's unusual to run two replica of the same slice on the same Solr-instance.");
|
||||
}
|
||||
|
||||
int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
|
||||
int requestedShardsToCreate = numSlices * repFactor;
|
||||
if (maxShardsAllowedToCreate < requestedShardsToCreate) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
|
||||
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
|
||||
+ ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
|
||||
+ ". This allows a maximum of " + maxShardsAllowedToCreate
|
||||
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
|
||||
+ " and value of " + REPLICATION_FACTOR + " is " + repFactor
|
||||
+ ". This requires " + requestedShardsToCreate
|
||||
+ " shards to be created (higher than the allowed number)");
|
||||
}
|
||||
|
||||
positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
|
||||
}
|
||||
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
|
||||
|
||||
ocmh.createConfNode(configName, collectionName, isLegacyCloud);
|
||||
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
|
||||
|
||||
// wait for a while until we don't see the collection
|
||||
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean created = false;
|
||||
while (! waitUntil.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
created = zkStateReader.getClusterState().hasCollection(collectionName);
|
||||
if(created) break;
|
||||
}
|
||||
if (!created)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
|
||||
|
||||
if (nodeList.isEmpty()) {
|
||||
log.info("Finished create command for collection: {}", collectionName);
|
||||
return;
|
||||
}
|
||||
|
||||
// For tracking async calls.
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
|
||||
log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
|
||||
collectionName, shardNames, repFactor));
|
||||
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
|
||||
for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
|
||||
ReplicaAssigner.Position position = e.getKey();
|
||||
String nodeName = e.getValue();
|
||||
String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
|
||||
log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
|
||||
, coreName, position.shard, collectionName, nodeName));
|
||||
|
||||
|
||||
String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
//in the new mode, create the replica in clusterstate prior to creating the core.
|
||||
// Otherwise the core creation fails
|
||||
if (!isLegacyCloud) {
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, position.shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, baseUrl);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
|
||||
}
|
||||
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, coreName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, position.shard);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
|
||||
if (async != null) {
|
||||
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
|
||||
params.add(ASYNC, coreAdminAsyncId);
|
||||
requestMap.put(nodeName, coreAdminAsyncId);
|
||||
}
|
||||
ocmh.addPropertyParams(message, params);
|
||||
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
sreq.nodeName = nodeName;
|
||||
params.set("qt", ocmh.adminPath);
|
||||
sreq.purpose = 1;
|
||||
sreq.shards = new String[]{baseUrl};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
||||
if (isLegacyCloud) {
|
||||
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
} else {
|
||||
coresToCreate.put(coreName, sreq);
|
||||
}
|
||||
}
|
||||
|
||||
if(!isLegacyCloud) {
|
||||
// wait for all replica entries to be created
|
||||
Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
|
||||
for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
|
||||
ShardRequest sreq = e.getValue();
|
||||
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
|
||||
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
}
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
|
||||
if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
|
||||
// Let's cleanup as we hit an exception
|
||||
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
|
||||
// element, which may be interpreted by the user as a positive ack
|
||||
ocmh.cleanupCollection(collectionName, new NamedList());
|
||||
log.info("Cleaned up artifacts for failed create collection for [" + collectionName + "]");
|
||||
} else {
|
||||
log.debug("Finished create command on all shards for collection: "
|
||||
+ collectionName);
|
||||
}
|
||||
} catch (SolrException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
|
||||
}
|
||||
}
|
||||
String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
|
||||
String configName = message.getStr(COLL_CONF);
|
||||
|
||||
if (configName == null) {
|
||||
// if there is only one conf, use that
|
||||
List<String> configNames = null;
|
||||
try {
|
||||
configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
|
||||
if (configNames != null && configNames.size() == 1) {
|
||||
configName = configNames.get(0);
|
||||
// no config set named, but there is only 1 - use it
|
||||
log.info("Only one config set found in zk - using it:" + configName);
|
||||
} else if (configNames.contains(coll)) {
|
||||
configName = coll;
|
||||
}
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
|
||||
}
|
||||
}
|
||||
return configName;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
public class CreateShardCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String sliceName = message.getStr(SHARD_ID_PROP);
|
||||
|
||||
log.info("Create shard invoked: {}", message);
|
||||
if (collectionName == null || sliceName == null)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
|
||||
int numSlices = 1;
|
||||
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
|
||||
String createNodeSetStr = message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET);
|
||||
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
|
||||
createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
|
||||
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
|
||||
// wait for a while until we see the shard
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean created = false;
|
||||
while (!timeout.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
|
||||
if (created) break;
|
||||
}
|
||||
if (!created)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
|
||||
|
||||
String configName = message.getStr(COLL_CONF);
|
||||
|
||||
String async = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (async != null) {
|
||||
requestMap = new HashMap<>(repFactor, 1.0f);
|
||||
}
|
||||
|
||||
for (int j = 1; j <= repFactor; j++) {
|
||||
String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + j;
|
||||
log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
|
||||
+ " on " + nodeName);
|
||||
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
|
||||
params.set(CoreAdminParams.NAME, shardName);
|
||||
params.set(COLL_CONF, configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
ocmh.addPropertyParams(message, params);
|
||||
|
||||
ocmh.sendShardRequest(nodeName, params, shardHandler, async, requestMap);
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap, Collections.emptySet());
|
||||
|
||||
log.info("Finished create command on all shards for collection: " + collectionName);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public DeleteAliasCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String aliasName = message.getStr(NAME);
|
||||
|
||||
Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
|
||||
Map<String,String> newCollectionAliasesMap = new HashMap<>();
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
newCollectionAliasesMap.putAll(zkStateReader.getAliases().getCollectionAliasMap());
|
||||
newCollectionAliasesMap.remove(aliasName);
|
||||
newAliasesMap.put("collection", newCollectionAliasesMap);
|
||||
Aliases newAliases = new Aliases(newAliasesMap);
|
||||
byte[] jsonBytes = null;
|
||||
if (newAliases.collectionAliasSize() > 0) { // only sub map right now
|
||||
jsonBytes = Utils.toJSON(newAliases.getAliasMap());
|
||||
}
|
||||
try {
|
||||
zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
|
||||
jsonBytes, true);
|
||||
checkForAliasAbsence(aliasName);
|
||||
// some fudge for other nodes
|
||||
Thread.sleep(100);
|
||||
} catch (KeeperException e) {
|
||||
log.error("", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
}
|
||||
private void checkForAliasAbsence(String name) {
|
||||
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean success = false;
|
||||
Aliases aliases = null;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
aliases = ocmh.zkStateReader.getAliases();
|
||||
String collections = aliases.getCollectionAlias(name);
|
||||
if (collections == null) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
log.warn("Timeout waiting to be notified of Alias change...");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.common.NonExistentCoreException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
final String collection = message.getStr(NAME);
|
||||
try {
|
||||
if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
|
||||
if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
|
||||
// if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
|
||||
// be removed in the finally - we cannot continue, because the below code will error if the collection
|
||||
// is not in the clusterstate
|
||||
return;
|
||||
}
|
||||
}
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
|
||||
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
|
||||
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
|
||||
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>();
|
||||
}
|
||||
|
||||
Set<String> okayExceptions = new HashSet<>(1);
|
||||
okayExceptions.add(NonExistentCoreException.class.getName());
|
||||
|
||||
ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
||||
// wait for a while until we don't see the collection
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean removed = false;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
removed = !zkStateReader.getClusterState().hasCollection(collection);
|
||||
if (removed) {
|
||||
Thread.sleep(500); // just a bit of time so it's more likely other
|
||||
// readers see on return
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!removed) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not fully remove collection: " + collection);
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
||||
try {
|
||||
if (zkStateReader.getZkClient().exists(
|
||||
ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
|
||||
zkStateReader.getZkClient().clean(
|
||||
ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
SolrException.log(log, "Cleaning up collection in zk was interrupted:"
|
||||
+ collection, e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (KeeperException e) {
|
||||
SolrException.log(log, "Problem cleaning up collection in zk:"
|
||||
+ collection, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
|
||||
public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -64,7 +65,7 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
log.info("Deleting replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), node);
|
||||
NamedList deleteResult = new NamedList();
|
||||
try {
|
||||
ocmh.deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
|
||||
((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
|
||||
cleanupLatch.countDown();
|
||||
if (deleteResult.get("failure") != null) {
|
||||
synchronized (results) {
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
|
||||
public class DeleteReplicaCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
deleteReplica(clusterState, message, results,null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
|
||||
throws KeeperException, InterruptedException {
|
||||
ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String shard = message.getStr(SHARD_ID_PROP);
|
||||
String replicaName = message.getStr(REPLICA_PROP);
|
||||
boolean parallel = message.getBool("parallel", false);
|
||||
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
Slice slice = coll.getSlice(shard);
|
||||
if (slice == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Invalid shard name : " + shard + " in collection : " + collectionName);
|
||||
}
|
||||
Replica replica = slice.getReplica(replicaName);
|
||||
if (replica == null) {
|
||||
ArrayList<String> l = new ArrayList<>();
|
||||
for (Replica r : slice.getReplicas())
|
||||
l.add(r.getName());
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
|
||||
+ shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
|
||||
}
|
||||
|
||||
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
|
||||
// on the command.
|
||||
if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
|
||||
+ " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
|
||||
}
|
||||
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
|
||||
if (asyncId != null) {
|
||||
requestMap.set(new HashMap<>(1, 1.0f));
|
||||
}
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
|
||||
params.add(CoreAdminParams.CORE, core);
|
||||
|
||||
params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
|
||||
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
|
||||
params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
|
||||
|
||||
boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
|
||||
if (isLive) {
|
||||
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
|
||||
}
|
||||
|
||||
Callable<Boolean> callable = () -> {
|
||||
try {
|
||||
if (isLive) {
|
||||
ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
|
||||
|
||||
//check if the core unload removed the corenode zk entry
|
||||
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// try and ensure core info is removed from cluster state
|
||||
ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
|
||||
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
|
||||
return Boolean.FALSE;
|
||||
} catch (Exception e) {
|
||||
results.add("failure", "Could not complete delete " + e.getMessage());
|
||||
throw e;
|
||||
} finally {
|
||||
if (onComplete != null) onComplete.run();
|
||||
}
|
||||
};
|
||||
|
||||
if (!parallel) {
|
||||
try {
|
||||
if (!callable.call())
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
throw e;
|
||||
} catch (Exception ex) {
|
||||
throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
|
||||
}
|
||||
|
||||
} else {
|
||||
ocmh.tpe.submit(callable);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
public class DeleteShardCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
|
||||
log.info("Delete shard invoked");
|
||||
Slice slice = clusterState.getSlice(collectionName, sliceId);
|
||||
|
||||
if (slice == null) {
|
||||
if (clusterState.hasCollection(collectionName)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"No shard with name " + sliceId + " exists for collection " + collectionName);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
|
||||
}
|
||||
}
|
||||
// For now, only allow for deletions of Inactive slices or custom hashes (range==null).
|
||||
// TODO: Add check for range gaps on Slice deletion
|
||||
final Slice.State state = slice.getState();
|
||||
if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
|
||||
|| state == Slice.State.CONSTRUCTION)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
|
||||
+ ". Only non-active (or custom-hashed) slices can be deleted.");
|
||||
}
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
|
||||
}
|
||||
|
||||
try {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
|
||||
params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
|
||||
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
|
||||
params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
|
||||
|
||||
ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap, Collections.emptySet());
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
|
||||
collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
||||
// wait for a while until we don't see the shard
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
boolean removed = false;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
removed = collection.getSlice(sliceId) == null;
|
||||
if (removed) {
|
||||
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!removed) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Could not fully remove collection: " + collectionName + " shard: " + sliceId);
|
||||
}
|
||||
|
||||
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
|
||||
|
||||
} catch (SolrException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,333 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.RoutingRule;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String sourceCollectionName = message.getStr("collection");
|
||||
String splitKey = message.getStr("split.key");
|
||||
String targetCollectionName = message.getStr("target.collection");
|
||||
int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
|
||||
|
||||
DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
|
||||
if (sourceCollection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
|
||||
}
|
||||
DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
|
||||
if (targetCollection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
|
||||
}
|
||||
if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
|
||||
}
|
||||
if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
|
||||
}
|
||||
CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
|
||||
CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
|
||||
Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
|
||||
if (sourceSlices.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
|
||||
}
|
||||
Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
|
||||
if (targetSlices.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
|
||||
}
|
||||
|
||||
String asyncId = null;
|
||||
if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
|
||||
asyncId = message.getStr(ASYNC);
|
||||
|
||||
for (Slice sourceSlice : sourceSlices) {
|
||||
for (Slice targetSlice : targetSlices) {
|
||||
log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
|
||||
migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
|
||||
timeout, results, asyncId, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
|
||||
DocCollection targetCollection, Slice targetSlice,
|
||||
String splitKey, int timeout,
|
||||
NamedList results, String asyncId, ZkNodeProps message) throws Exception {
|
||||
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
if (clusterState.hasCollection(tempSourceCollectionName)) {
|
||||
log.info("Deleting temporary collection: " + tempSourceCollectionName);
|
||||
Map<String, Object> props = makeMap(
|
||||
Overseer.QUEUE_OPERATION, DELETE.toLower(),
|
||||
NAME, tempSourceCollectionName);
|
||||
|
||||
try {
|
||||
ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
|
||||
}
|
||||
}
|
||||
|
||||
CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
|
||||
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
|
||||
|
||||
ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
|
||||
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
|
||||
// intersect source range, keyHashRange and target range
|
||||
// this is the range that has to be split from source and transferred to target
|
||||
DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
|
||||
if (splitRange == null) {
|
||||
log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
|
||||
return;
|
||||
}
|
||||
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
|
||||
|
||||
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
|
||||
// For tracking async calls.
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
|
||||
+ targetLeader.getStr("core") + " to buffer updates");
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
|
||||
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
|
||||
COLLECTION_PROP, sourceCollection.getName(),
|
||||
SHARD_ID_PROP, sourceSlice.getName(),
|
||||
"routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
|
||||
"range", splitRange.toString(),
|
||||
"targetCollection", targetCollection.getName(),
|
||||
"expireAt", RoutingRule.makeExpiryAt(timeout));
|
||||
log.info("Adding routing rule: " + m);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
||||
// wait for a while until we see the new rule
|
||||
log.info("Waiting to see routing rule updated in clusterstate");
|
||||
TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
|
||||
boolean added = false;
|
||||
while (!waitUntil.hasTimedOut()) {
|
||||
Thread.sleep(100);
|
||||
sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
|
||||
sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
|
||||
Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
|
||||
if (rules != null) {
|
||||
RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
|
||||
if (rule != null && rule.getRouteRanges().contains(splitRange)) {
|
||||
added = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!added) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
|
||||
}
|
||||
|
||||
log.info("Routing rule added successfully");
|
||||
|
||||
// Create temp core on source shard
|
||||
Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
|
||||
|
||||
// create a temporary collection with just one node on the shard leader
|
||||
String configName = zkStateReader.readConfigName(sourceCollection.getName());
|
||||
Map<String, Object> props = makeMap(
|
||||
Overseer.QUEUE_OPERATION, CREATE.toLower(),
|
||||
NAME, tempSourceCollectionName,
|
||||
REPLICATION_FACTOR, 1,
|
||||
NUM_SLICES, 1,
|
||||
COLL_CONF, configName,
|
||||
CREATE_NODE_SET, sourceLeader.getNodeName());
|
||||
if (asyncId != null) {
|
||||
String internalAsyncId = asyncId + Math.abs(System.nanoTime());
|
||||
props.put(ASYNC, internalAsyncId);
|
||||
}
|
||||
|
||||
log.info("Creating temporary collection: " + props);
|
||||
ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
|
||||
// refresh cluster state
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
|
||||
Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
|
||||
|
||||
String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
|
||||
String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
|
||||
sourceLeader.getNodeName(), tempCollectionReplica1);
|
||||
// wait for the replicas to be seen as active on temp source leader
|
||||
log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(tempCollectionReplica1);
|
||||
cmd.setNodeName(sourceLeader.getNodeName());
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
// we don't want this to happen asynchronously
|
||||
ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
|
||||
" or timed out waiting for it to come up", asyncId, requestMap);
|
||||
|
||||
log.info("Asking source leader to split index");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
|
||||
params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
|
||||
params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
|
||||
params.set(CoreAdminParams.RANGES, splitRange.toString());
|
||||
params.set("split.key", splitKey);
|
||||
|
||||
String tempNodeName = sourceLeader.getNodeName();
|
||||
|
||||
ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
|
||||
|
||||
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
|
||||
tempSourceCollectionName, targetLeader.getNodeName());
|
||||
String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
|
||||
props = new HashMap<>();
|
||||
props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
props.put(COLLECTION_PROP, tempSourceCollectionName);
|
||||
props.put(SHARD_ID_PROP, tempSourceSlice.getName());
|
||||
props.put("node", targetLeader.getNodeName());
|
||||
props.put(CoreAdminParams.NAME, tempCollectionReplica2);
|
||||
// copy over property params:
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
props.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
props.put(ASYNC, asyncId);
|
||||
}
|
||||
((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
|
||||
"temporary collection in target leader node.", asyncId, requestMap);
|
||||
|
||||
coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
|
||||
targetLeader.getNodeName(), tempCollectionReplica2);
|
||||
// wait for the replicas to be seen as active on temp source leader
|
||||
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
|
||||
cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(tempSourceLeader.getStr("core"));
|
||||
cmd.setNodeName(targetLeader.getNodeName());
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
params = new ModifiableSolrParams(cmd.getParams());
|
||||
|
||||
ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
|
||||
" replica or timed out waiting for them to come up", asyncId, requestMap);
|
||||
|
||||
log.info("Successfully created replica of temp source collection on target leader node");
|
||||
|
||||
log.info("Requesting merge of temp source collection replica to target leader");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
|
||||
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
|
||||
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
|
||||
|
||||
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
|
||||
+ targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
|
||||
ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
|
||||
|
||||
log.info("Asking target leader to apply buffered updates");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
|
||||
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
|
||||
asyncId, requestMap);
|
||||
|
||||
try {
|
||||
log.info("Deleting temporary collection: " + tempSourceCollectionName);
|
||||
props = makeMap(
|
||||
Overseer.QUEUE_OPERATION, DELETE.toLower(),
|
||||
NAME, tempSourceCollectionName);
|
||||
ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to delete temporary collection: " + tempSourceCollectionName
|
||||
+ ". Please remove it manually", e);
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
|
||||
|
||||
public class OverseerRoleCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
private final CollectionAction operation;
|
||||
private final OverseerNodePrioritizer overseerPrioritizer;
|
||||
|
||||
|
||||
|
||||
public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
|
||||
this.ocmh = ocmh;
|
||||
this.operation = operation;
|
||||
this.overseerPrioritizer = prioritizer;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
SolrZkClient zkClient = zkStateReader.getZkClient();
|
||||
Map roles = null;
|
||||
String node = message.getStr("node");
|
||||
|
||||
String roleName = message.getStr("role");
|
||||
boolean nodeExists = false;
|
||||
if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
|
||||
roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
|
||||
} else {
|
||||
roles = new LinkedHashMap(1);
|
||||
}
|
||||
|
||||
List nodeList = (List) roles.get(roleName);
|
||||
if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
|
||||
if (ADDROLE == operation) {
|
||||
log.info("Overseer role added to {}", node);
|
||||
if (!nodeList.contains(node)) nodeList.add(node);
|
||||
} else if (REMOVEROLE == operation) {
|
||||
log.info("Overseer role removed from {}", node);
|
||||
nodeList.remove(node);
|
||||
}
|
||||
|
||||
if (nodeExists) {
|
||||
zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
|
||||
} else {
|
||||
zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
|
||||
}
|
||||
//if there are too many nodes this command may time out. And most likely dedicated
|
||||
// overseers are created when there are too many nodes . So , do this operation in a separate thread
|
||||
new Thread(() -> {
|
||||
try {
|
||||
overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
|
||||
} catch (Exception e) {
|
||||
log.error("Error in prioritizing Overseer", e);
|
||||
}
|
||||
|
||||
}).start();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.util.stats.Snapshot;
|
||||
import org.apache.solr.util.stats.Timer;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class OverseerStatusCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
|
||||
results.add("leader", leaderNode);
|
||||
Stat stat = new Stat();
|
||||
zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
|
||||
results.add("overseer_queue_size", stat.getNumChildren());
|
||||
stat = new Stat();
|
||||
zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
|
||||
results.add("overseer_work_queue_size", stat.getNumChildren());
|
||||
stat = new Stat();
|
||||
zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
|
||||
results.add("overseer_collection_queue_size", stat.getNumChildren());
|
||||
|
||||
NamedList overseerStats = new NamedList();
|
||||
NamedList collectionStats = new NamedList();
|
||||
NamedList stateUpdateQueueStats = new NamedList();
|
||||
NamedList workQueueStats = new NamedList();
|
||||
NamedList collectionQueueStats = new NamedList();
|
||||
Overseer.Stats stats = ocmh.stats;
|
||||
for (Map.Entry<String, Overseer.Stat> entry : stats.getStats().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
NamedList<Object> lst = new SimpleOrderedMap<>();
|
||||
if (key.startsWith("collection_")) {
|
||||
collectionStats.add(key.substring(11), lst);
|
||||
int successes = stats.getSuccessCount(entry.getKey());
|
||||
int errors = stats.getErrorCount(entry.getKey());
|
||||
lst.add("requests", successes);
|
||||
lst.add("errors", errors);
|
||||
List<Overseer.FailedOp> failureDetails = stats.getFailureDetails(key);
|
||||
if (failureDetails != null) {
|
||||
List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
|
||||
for (Overseer.FailedOp failedOp : failureDetails) {
|
||||
SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
|
||||
fail.add("request", failedOp.req.getProperties());
|
||||
fail.add("response", failedOp.resp.getResponse());
|
||||
failures.add(fail);
|
||||
}
|
||||
lst.add("recent_failures", failures);
|
||||
}
|
||||
} else if (key.startsWith("/overseer/queue_")) {
|
||||
stateUpdateQueueStats.add(key.substring(16), lst);
|
||||
} else if (key.startsWith("/overseer/queue-work_")) {
|
||||
workQueueStats.add(key.substring(21), lst);
|
||||
} else if (key.startsWith("/overseer/collection-queue-work_")) {
|
||||
collectionQueueStats.add(key.substring(32), lst);
|
||||
} else {
|
||||
// overseer stats
|
||||
overseerStats.add(key, lst);
|
||||
int successes = stats.getSuccessCount(entry.getKey());
|
||||
int errors = stats.getErrorCount(entry.getKey());
|
||||
lst.add("requests", successes);
|
||||
lst.add("errors", errors);
|
||||
}
|
||||
Timer timer = entry.getValue().requestTime;
|
||||
Snapshot snapshot = timer.getSnapshot();
|
||||
lst.add("totalTime", timer.getSum());
|
||||
lst.add("avgRequestsPerMinute", timer.getMeanRate());
|
||||
lst.add("5minRateRequestsPerMinute", timer.getFiveMinuteRate());
|
||||
lst.add("15minRateRequestsPerMinute", timer.getFifteenMinuteRate());
|
||||
lst.add("avgTimePerRequest", timer.getMean());
|
||||
lst.add("medianRequestTime", snapshot.getMedian());
|
||||
lst.add("75thPctlRequestTime", snapshot.get75thPercentile());
|
||||
lst.add("95thPctlRequestTime", snapshot.get95thPercentile());
|
||||
lst.add("99thPctlRequestTime", snapshot.get99thPercentile());
|
||||
lst.add("999thPctlRequestTime", snapshot.get999thPercentile());
|
||||
}
|
||||
results.add("overseer_operations", overseerStats);
|
||||
results.add("collection_operations", collectionStats);
|
||||
results.add("overseer_queue", stateUpdateQueueStats);
|
||||
results.add("overseer_internal_queue", workQueueStats);
|
||||
results.add("collection_queue", collectionQueueStats);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,243 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.backup.BackupManager;
|
||||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROPS;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
|
||||
public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
// TODO maybe we can inherit createCollection's options/code
|
||||
String restoreCollectionName = message.getStr(COLLECTION_PROP);
|
||||
String backupName = message.getStr(NAME); // of backup
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
|
||||
BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
|
||||
|
||||
URI backupPath = repository.createURI(location, backupName);
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
BackupManager backupMgr = new BackupManager(repository, zkStateReader, restoreCollectionName);
|
||||
|
||||
Properties properties = backupMgr.readBackupProperties(location, backupName);
|
||||
String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
|
||||
DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
|
||||
|
||||
//Upload the configs
|
||||
String configName = (String) properties.get(COLL_CONF);
|
||||
String restoreConfigName = message.getStr(COLL_CONF, configName);
|
||||
if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
|
||||
log.info("Using existing config {}", restoreConfigName);
|
||||
//TODO add overwrite option?
|
||||
} else {
|
||||
log.info("Uploading config {}", restoreConfigName);
|
||||
backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
|
||||
}
|
||||
|
||||
log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
|
||||
location);
|
||||
|
||||
//Create core-less collection
|
||||
{
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
|
||||
propMap.put("fromApi", "true"); // mostly true. Prevents autoCreated=true in the collection state.
|
||||
|
||||
// inherit settings from input API, defaulting to the backup's setting. Ex: replicationFactor
|
||||
for (String collProp : COLL_PROPS.keySet()) {
|
||||
Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
|
||||
if (val != null) {
|
||||
propMap.put(collProp, val);
|
||||
}
|
||||
}
|
||||
|
||||
propMap.put(NAME, restoreCollectionName);
|
||||
propMap.put(CREATE_NODE_SET, CREATE_NODE_SET_EMPTY); //no cores
|
||||
propMap.put(COLL_CONF, restoreConfigName);
|
||||
|
||||
// router.*
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
|
||||
for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
|
||||
propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
|
||||
}
|
||||
|
||||
Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
|
||||
if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
|
||||
propMap.put(SHARDS_PROP, StrUtils.join(sliceNames, ','));
|
||||
} else {
|
||||
propMap.put(NUM_SLICES, sliceNames.size());
|
||||
// ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
|
||||
// list of names, and if so uses this instead of building it. We clear the replica list.
|
||||
Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
|
||||
Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
|
||||
for (Slice backupSlice : backupSlices) {
|
||||
newSlices.put(backupSlice.getName(),
|
||||
new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
|
||||
}
|
||||
propMap.put(SHARDS_PROP, newSlices);
|
||||
}
|
||||
|
||||
ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
|
||||
// note: when createCollection() returns, the collection exists (no race)
|
||||
}
|
||||
|
||||
DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||
|
||||
//Mark all shards in CONSTRUCTION STATE while we restore the data
|
||||
{
|
||||
//TODO might instead createCollection accept an initial state? Is there a race?
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
for (Slice shard : restoreCollection.getSlices()) {
|
||||
propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
|
||||
}
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
|
||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
}
|
||||
|
||||
// TODO how do we leverage the CREATE_NODE_SET / RULE / SNITCH logic in createCollection?
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
//Create one replica per shard and copy backed up data to it
|
||||
for (Slice slice : restoreCollection.getSlices()) {
|
||||
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
|
||||
propMap.put(COLLECTION_PROP, restoreCollectionName);
|
||||
propMap.put(SHARD_ID_PROP, slice.getName());
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
ocmh.addPropertyParams(message, propMap);
|
||||
|
||||
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
|
||||
}
|
||||
|
||||
//refresh the location copy of collection state
|
||||
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
//Copy data from backed up index to each replica
|
||||
for (Slice slice : restoreCollection.getSlices()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
|
||||
params.set(NAME, "snapshot." + slice.getName());
|
||||
params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath());
|
||||
params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
|
||||
|
||||
ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
|
||||
|
||||
//Mark all shards in ACTIVE STATE
|
||||
{
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
|
||||
for (Slice shard : restoreCollection.getSlices()) {
|
||||
propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
|
||||
}
|
||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
}
|
||||
|
||||
//refresh the location copy of collection state
|
||||
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
//Add the remaining replicas for each shard
|
||||
Integer numReplicas = restoreCollection.getReplicationFactor();
|
||||
if (numReplicas != null && numReplicas > 1) {
|
||||
log.info("Adding replicas to restored collection={}", restoreCollection);
|
||||
|
||||
for (Slice slice : restoreCollection.getSlices()) {
|
||||
for (int i = 1; i < numReplicas; i++) {
|
||||
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(COLLECTION_PROP, restoreCollectionName);
|
||||
propMap.put(SHARD_ID_PROP, slice.getName());
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
ocmh.addPropertyParams(message, propMap);
|
||||
|
||||
ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,458 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.PlainIdRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
|
||||
public class SplitShardCmd implements Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public SplitShardCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
split(state, message, results);
|
||||
}
|
||||
|
||||
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
String collectionName = message.getStr("collection");
|
||||
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
|
||||
log.info("Split shard invoked");
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
|
||||
String splitKey = message.getStr("split.key");
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
|
||||
Slice parentSlice;
|
||||
|
||||
if (slice == null) {
|
||||
if (router instanceof CompositeIdRouter) {
|
||||
Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
|
||||
if (searchSlices.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
|
||||
}
|
||||
if (searchSlices.size() > 1) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
|
||||
}
|
||||
parentSlice = searchSlices.iterator().next();
|
||||
slice = parentSlice.getName();
|
||||
log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
|
||||
+ router.getClass().getName());
|
||||
}
|
||||
} else {
|
||||
parentSlice = collection.getSlice(slice);
|
||||
}
|
||||
|
||||
if (parentSlice == null) {
|
||||
// no chance of the collection being null because ClusterState#getCollection(String) would have thrown
|
||||
// an exception already
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
|
||||
}
|
||||
|
||||
// find the leader for the shard
|
||||
Replica parentShardLeader = null;
|
||||
try {
|
||||
parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
DocRouter.Range range = parentSlice.getRange();
|
||||
if (range == null) {
|
||||
range = new PlainIdRouter().fullRange();
|
||||
}
|
||||
|
||||
List<DocRouter.Range> subRanges = null;
|
||||
String rangesStr = message.getStr(CoreAdminParams.RANGES);
|
||||
if (rangesStr != null) {
|
||||
String[] ranges = rangesStr.split(",");
|
||||
if (ranges.length == 0 || ranges.length == 1) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
|
||||
} else {
|
||||
subRanges = new ArrayList<>(ranges.length);
|
||||
for (int i = 0; i < ranges.length; i++) {
|
||||
String r = ranges[i];
|
||||
try {
|
||||
subRanges.add(DocRouter.DEFAULT.fromString(r));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
|
||||
}
|
||||
if (!subRanges.get(i).isSubsetOf(range)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
|
||||
}
|
||||
}
|
||||
List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
|
||||
Collections.sort(temp);
|
||||
if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
|
||||
}
|
||||
for (int i = 1; i < temp.size(); i++) {
|
||||
if (temp.get(i - 1).max + 1 != temp.get(i).min) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
|
||||
+ " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (splitKey != null) {
|
||||
if (router instanceof CompositeIdRouter) {
|
||||
CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
|
||||
subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
|
||||
if (subRanges.size() == 1) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
|
||||
+ " has a hash range that is exactly equal to hash range of shard: " + slice);
|
||||
}
|
||||
for (DocRouter.Range subRange : subRanges) {
|
||||
if (subRange.min == subRange.max) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
|
||||
}
|
||||
}
|
||||
log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
|
||||
rangesStr = "";
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
DocRouter.Range subRange = subRanges.get(i);
|
||||
rangesStr += subRange.toString();
|
||||
if (i < subRanges.size() - 1) rangesStr += ',';
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// todo: fixed to two partitions?
|
||||
subRanges = router.partitionRange(2, range);
|
||||
}
|
||||
|
||||
try {
|
||||
List<String> subSlices = new ArrayList<>(subRanges.size());
|
||||
List<String> subShardNames = new ArrayList<>(subRanges.size());
|
||||
String nodeName = parentShardLeader.getNodeName();
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = slice + "_" + i;
|
||||
subSlices.add(subSlice);
|
||||
String subShardName = collectionName + "_" + subSlice + "_replica1";
|
||||
subShardNames.add(subShardName);
|
||||
|
||||
Slice oSlice = collection.getSlice(subSlice);
|
||||
if (oSlice != null) {
|
||||
final Slice.State state = oSlice.getState();
|
||||
if (state == Slice.State.ACTIVE) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
|
||||
} else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
|
||||
// delete the shards
|
||||
for (String sub : subSlices) {
|
||||
log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
propMap.put(SHARD_ID_PROP, sub);
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
try {
|
||||
ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = subSlices.get(i);
|
||||
String subShardName = subShardNames.get(i);
|
||||
DocRouter.Range subRange = subRanges.get(i);
|
||||
|
||||
log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
|
||||
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
|
||||
propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||
propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
|
||||
propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
|
||||
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
|
||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
|
||||
// wait until we are able to see the new shard in cluster state
|
||||
ocmh.waitForNewShard(collectionName, subSlice);
|
||||
|
||||
// refresh cluster state
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
|
||||
log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
|
||||
+ " on " + nodeName);
|
||||
propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
propMap.put(SHARD_ID_PROP, subSlice);
|
||||
propMap.put("node", nodeName);
|
||||
propMap.put(CoreAdminParams.NAME, subShardName);
|
||||
// copy over property params:
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
propMap.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
|
||||
|
||||
for (String subShardName : subShardNames) {
|
||||
// wait for parent leader to acknowledge the sub-shard core
|
||||
log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
|
||||
String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(subShardName);
|
||||
cmd.setNodeName(nodeName);
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
|
||||
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
|
||||
ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
|
||||
asyncId, requestMap);
|
||||
|
||||
log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
|
||||
+ " on: " + parentShardLeader);
|
||||
|
||||
log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
|
||||
+ collectionName + " on " + parentShardLeader);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
|
||||
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
|
||||
for (int i = 0; i < subShardNames.size(); i++) {
|
||||
String subShardName = subShardNames.get(i);
|
||||
params.add(CoreAdminParams.TARGET_CORE, subShardName);
|
||||
}
|
||||
params.set(CoreAdminParams.RANGES, rangesStr);
|
||||
|
||||
ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
|
||||
requestMap);
|
||||
|
||||
log.info("Index on shard: " + nodeName + " split into two successfully");
|
||||
|
||||
// apply buffered updates on sub-shards
|
||||
for (int i = 0; i < subShardNames.size(); i++) {
|
||||
String subShardName = subShardNames.get(i);
|
||||
|
||||
log.info("Applying buffered updates on : " + subShardName);
|
||||
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, subShardName);
|
||||
|
||||
ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
|
||||
" to apply buffered updates", asyncId, requestMap);
|
||||
|
||||
log.info("Successfully applied buffered updates on : " + subShardNames);
|
||||
|
||||
// Replica creation for the new Slices
|
||||
|
||||
// look at the replication factor and see if it matches reality
|
||||
// if it does not, find best nodes to create more cores
|
||||
|
||||
// TODO: Have replication factor decided in some other way instead of numShards for the parent
|
||||
|
||||
int repFactor = parentSlice.getReplicas().size();
|
||||
|
||||
// we need to look at every node and see how many cores it serves
|
||||
// add our new cores to existing nodes serving the least number of cores
|
||||
// but (for now) require that each core goes on a distinct node.
|
||||
|
||||
// TODO: add smarter options that look at the current number of cores per
|
||||
// node?
|
||||
// for now we just go random
|
||||
Set<String> nodes = clusterState.getLiveNodes();
|
||||
List<String> nodeList = new ArrayList<>(nodes.size());
|
||||
nodeList.addAll(nodes);
|
||||
|
||||
// TODO: Have maxShardsPerNode param for this operation?
|
||||
|
||||
// Remove the node that hosts the parent shard for replica creation.
|
||||
nodeList.remove(nodeName);
|
||||
|
||||
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
||||
|
||||
|
||||
Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
|
||||
new ArrayList<>(clusterState.getLiveNodes()),
|
||||
new ZkNodeProps(collection.getProperties()),
|
||||
subSlices, repFactor - 1);
|
||||
|
||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||
|
||||
for (Map.Entry<ReplicaAssigner.Position, String> entry : nodeMap.entrySet()) {
|
||||
String sliceName = entry.getKey().shard;
|
||||
String subShardNodeName = entry.getValue();
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
|
||||
|
||||
log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
|
||||
+ collectionName + " on " + subShardNodeName);
|
||||
|
||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, sliceName,
|
||||
ZkStateReader.CORE_NAME_PROP, shardName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
|
||||
ZkStateReader.NODE_NAME_PROP, subShardNodeName);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
|
||||
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
propMap.put(SHARD_ID_PROP, sliceName);
|
||||
propMap.put("node", subShardNodeName);
|
||||
propMap.put(CoreAdminParams.NAME, shardName);
|
||||
// copy over property params:
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
propMap.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
// special flag param to instruct addReplica not to create the replica in cluster state again
|
||||
propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
|
||||
|
||||
replicas.add(propMap);
|
||||
}
|
||||
|
||||
// we must set the slice state into recovery before actually creating the replica cores
|
||||
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
||||
// always gets a chance to execute. See SOLR-7673
|
||||
|
||||
if (repFactor == 1) {
|
||||
// switch sub shard states to 'active'
|
||||
log.info("Replication factor is 1 so switching shard states");
|
||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
propMap.put(slice, Slice.State.INACTIVE.toString());
|
||||
for (String subSlice : subSlices) {
|
||||
propMap.put(subSlice, Slice.State.ACTIVE.toString());
|
||||
}
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
inQueue.offer(Utils.toJSON(m));
|
||||
} else {
|
||||
log.info("Requesting shard state be set to 'recovery'");
|
||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
for (String subSlice : subSlices) {
|
||||
propMap.put(subSlice, Slice.State.RECOVERY.toString());
|
||||
}
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
inQueue.offer(Utils.toJSON(m));
|
||||
}
|
||||
|
||||
// now actually create replica cores on sub shard nodes
|
||||
for (Map<String, Object> replica : replicas) {
|
||||
ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
|
||||
}
|
||||
|
||||
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
|
||||
|
||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||
|
||||
ocmh.commit(results, slice, parentShardLeader);
|
||||
|
||||
return true;
|
||||
} catch (SolrException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue