SOLR-9317: ADDREPLICA command should be able to add more than one replica to a collection,shard at a time.

The API now supports 'nrtReplicas', 'tlogReplicas', 'pullReplicas' parameters as well 'createNodeSet' parameter. As part of this change, the CREATESHARD API now delegates placing replicas entirely to the ADDREPLICA command and uses the new parameters to add all the replicas in one API call.
This commit is contained in:
Shalin Shekhar Mangar 2018-09-21 15:12:21 +05:30
parent af2de93451
commit 4bcace571e
15 changed files with 557 additions and 286 deletions

View File

@ -90,6 +90,11 @@ New Features
error. Previously, the collapsing behavior was unreliable and undefined despite no explicit error.
(Munendra S N, David Smiley)
* SOLR-9317: ADDREPLICA command should be able to add more than one replica to a collection,shard at a time.
The API now supports 'nrtReplicas', 'tlogReplicas', 'pullReplicas' parameters as well 'createNodeSet' parameter.
As part of this change, the CREATESHARD API now delegates placing replicas entirely to the ADDREPLICA command
and uses the new parameters to add all the replicas in one API call. (shalin)
Other Changes
----------------------

View File

@ -20,14 +20,17 @@ package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@ -41,6 +44,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -54,10 +58,14 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@ -79,28 +87,116 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
addReplica(state, message, results, null);
}
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws IOException, InterruptedException {
log.debug("addReplica() : {}", Utils.toJSONString(message));
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
DocCollection coll = clusterState.getCollection(collectionName);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " does not exist");
}
if (coll.getSlice(shard) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collectionName + " shard: " + shard + " does not exist");
}
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final String asyncId = message.getStr(ASYNC);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
String createNodeSetStr = message.getStr(CREATE_NODE_SET);
if (node != null && createNodeSetStr != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Both 'node' and 'createNodeSet' parameters cannot be specified together.");
}
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
boolean parallel = message.getBool("parallel", false);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount, sessionWrapper)
.stream()
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
.collect(Collectors.toList());
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ZkStateReader zkStateReader = ocmh.zkStateReader;
// For tracking async calls.
Map<String,String> requestMap = new HashMap<>();
for (CreateReplica createReplica : createReplicas) {
assert createReplica.coreName != null;
ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica);
ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap);
}
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
for (CreateReplica replica : createReplicas) {
ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
}
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null,
createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
}
return createReplicas.stream()
.map(createReplica -> new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, createReplica.collectionName,
ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.NODE_NAME_PROP, createReplica.node
))
.collect(Collectors.toList());
}
private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException {
if (coll.getStr(WITH_COLLECTION) != null) {
String withCollectionName = coll.getStr(WITH_COLLECTION);
DocCollection withCollection = clusterState.getCollection(withCollectionName);
@ -109,14 +205,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName();
List<Replica> replicas = withCollection.getReplicas(node);
List<Replica> replicas = withCollection.getReplicas(createReplica.node);
if (replicas == null || replicas.isEmpty()) {
// create a replica of withCollection on the identified node before proceeding further
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, withCollectionName,
ZkStateReader.SHARD_ID_PROP, withCollectionShard,
"node", node,
"node", createReplica.node,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
addReplica(clusterState, props, results, null);
}
@ -130,14 +226,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
ZkStateReader.NODE_NAME_PROP, node,
ZkStateReader.REPLICA_TYPE, replicaType.name());
if (coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
ZkStateReader.NODE_NAME_PROP, createReplica.node,
ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
if (createReplica.coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
}
try {
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
@ -146,7 +242,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName());
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
}
String configName = zkStateReader.readConfigName(collectionName);
@ -156,12 +252,12 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, coreName);
params.set(CoreAdminParams.NAME, createReplica.coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
if (shard != null) {
params.set(CoreAdminParams.SHARD, shard);
params.set(CoreAdminParams.REPLICA_TYPE, createReplica.replicaType.name());
if (createReplica.sliceName != null) {
params.set(CoreAdminParams.SHARD, createReplica.sliceName);
} else if (routeKey != null) {
Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
if (slices.isEmpty()) {
@ -181,108 +277,34 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (instanceDir != null) {
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
}
if (coreNodeName != null) {
params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
if (createReplica.coreNodeName != null) {
params.set(CoreAdminParams.CORE_NODE_NAME, createReplica.coreNodeName);
}
ocmh.addPropertyParams(message, params);
// For tracking async calls.
Map<String,String> requestMap = new HashMap<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
final String fnode = node;
final String fcoreName = coreName;
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName);
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
return params;
}
return new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, node
);
}
public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
ZkNodeProps message, ReplicaPosition replicaPosition) {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr(CoreAdminParams.NODE);
String node = replicaPosition.node;
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
Replica.Type replicaType = replicaPosition.type;
if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
}
DocCollection coll = clusterState.getCollection(collection);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
if (coll.getSlice(shard) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " shard: " + shard + " does not exist");
}
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
if (node == null) {
if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
node = Assign.identifyNodes(cloudManager,
clusterState,
Collections.emptyList(),
collection,
message,
Collections.singletonList(shard),
replicaType == Replica.Type.NRT ? 1 : 0,
replicaType == Replica.Type.TLOG ? 1 : 0,
replicaType == Replica.Type.PULL ? 1 : 0
).get(0).node;
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
}
}
log.info("Node Identified {} for creating new replica of shard {}", node, shard);
if (!clusterState.liveNodesContain(node)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
}
DocCollection coll = clusterState.getCollection(collection);
if (coreName == null) {
coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
} else if (!skipCreateReplicaInClusterState) {
@ -297,11 +319,103 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
}
if (coreNodeName != null) {
message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
}
message = message.plus(CoreAdminParams.NAME, coreName);
message = message.plus(CoreAdminParams.NODE, node);
return message;
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message,
EnumMap<Replica.Type, Integer> replicaTypeVsCount,
AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String sliceName = message.getStr(SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = replicaTypeVsCount.get(Replica.Type.NRT);
int numPullReplicas = replicaTypeVsCount.get(Replica.Type.PULL);
int numTlogReplicas = replicaTypeVsCount.get(Replica.Type.TLOG);
int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
String node = message.getStr(CoreAdminParams.NODE);
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
if (createNodeSetStr == null) {
if (node != null) {
message.getProperties().put(OverseerCollectionMessageHandler.CREATE_NODE_SET, node);
createNodeSetStr = node;
}
}
List<ReplicaPosition> positions = null;
if (!skipCreateReplicaInClusterState) {
if (CloudUtil.usePolicyFramework(collection, cloudManager)) {
if (node == null) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
positions = Assign.identifyNodes(cloudManager,
clusterState,
Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM),
collection.getName(),
message,
Collections.singletonList(sliceName),
numNrtReplicas,
numTlogReplicas,
numPullReplicas);
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager);
int i = 0;
positions = new ArrayList<>();
for (Map.Entry<Replica.Type, Integer> e : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
i++;
}
}
}
}
if (positions == null) {
assert node != null;
if (node == null) {
// in case asserts are disabled
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"A node should have been identified to add replica but wasn't. Please inform solr developers at SOLR-9317");
}
// it is unlikely that multiple replicas have been requested to be created on
// the same node, but we've got to accommodate.
positions = new ArrayList<>(totalReplicas);
int i = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < entry.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node));
}
}
}
return positions;
}
/**
* A data structure to keep all information required to create a new replica in one place.
* Think of it as a typed ZkNodeProps for replica creation.
*
* This is <b>not</b> a public API and can be changed at any time without notice.
*/
public static class CreateReplica {
public final String collectionName;
public final String sliceName;
public final String node;
public final Replica.Type replicaType;
public String coreName;
public String coreNodeName;
CreateReplica(String collectionName, String sliceName, String node, Replica.Type replicaType, String coreName, String coreNodeName) {
this.collectionName = collectionName;
this.sliceName = sliceName;
this.node = node;
this.replicaType = replicaType;
this.coreName = coreName;
this.coreNodeName = coreNodeName;
}
}
}

View File

@ -273,7 +273,7 @@ public class Assign {
} else {
if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules");
}
}
@ -322,11 +322,11 @@ public class Assign {
// Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
String shard, int nrtReplicas,
String shard, int nrtReplicas, int tlogReplicas, int pullReplicas,
Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException, AssignmentException {
log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}", shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet );
DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getMaxShardsPerNode();
Integer maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode();
List<String> createNodeList = null;
if (createNodeSet instanceof List) {
@ -338,15 +338,15 @@ public class Assign {
HashMap<String, ReplicaCount> nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList);
if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes.
int availableSlots = 0;
long availableSlots = 0;
for (Map.Entry<String, ReplicaCount> ent : nodeNameVsShardCount.entrySet()) {
//ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary.
if (maxShardsPerNode > ent.getValue().thisCollectionNodes) {
availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes);
}
}
if (availableSlots < nrtReplicas) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
if (availableSlots < nrtReplicas + tlogReplicas + pullReplicas) {
throw new AssignmentException(
String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d",
nrtReplicas, collectionName, maxShardsPerNode));
}
@ -355,13 +355,17 @@ public class Assign {
List l = (List) coll.get(DocCollection.RULE);
List<ReplicaPosition> replicaPositions = null;
if (l != null) {
if (tlogReplicas + pullReplicas > 0) {
throw new AssignmentException(Replica.Type.TLOG + " or " + Replica.Type.PULL +
" replica types not supported with placement rules");
}
// TODO: make it so that this method doesn't require access to CC
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l);
}
String policyName = coll.getStr(POLICY);
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, tlogReplicas, pullReplicas,
policyName, cloudManager, createNodeList);
}
@ -461,7 +465,7 @@ public class Assign {
return nodeNameVsShardCount;
}
DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getMaxShardsPerNode();
int maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode();
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
DocCollection c = entry.getValue();

View File

@ -17,32 +17,17 @@
package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
@ -77,38 +62,35 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
SolrCloseableLatch countDownLatch;
try {
List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we see the shard
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
for (ReplicaPosition position : positions) {
String nodeName = position.node;
String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+ " on " + nodeName);
// Need to create new params for each request
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.REPLICA_TYPE, position.type.name(),
CoreAdminParams.NODE, nodeName,
CoreAdminParams.NAME, coreName,
ZkStateReader.NRT_REPLICAS, String.valueOf(numNrtReplicas),
ZkStateReader.TLOG_REPLICAS, String.valueOf(numTlogReplicas),
ZkStateReader.PULL_REPLICAS, String.valueOf(numPullReplicas),
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Map<String, Object> propertyParams = new HashMap<>();
ocmh.addPropertyParams(message, propertyParams);
addReplicasProps = addReplicasProps.plus(propertyParams);
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
final NamedList addResult = new NamedList();
try {
ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
countDownLatch.countDown();
Object addResultFailure = addResult.get("failure");
if (addResultFailure != null) {
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
@ -126,65 +108,14 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
success.addAll((NamedList) addResult.get("success"));
}
});
} catch (Assign.AssignmentException e) {
// clean up the slice that we created
ZkNodeProps deleteShard = new ZkNodeProps(COLLECTION_PROP, collectionName, SHARD_ID_PROP, sliceName, ASYNC, async);
new DeleteShardCmd(ocmh).call(clusterState, deleteShard, results);
throw e;
}
} finally {
if (sessionWrapper.get() != null) sessionWrapper.get().release();
}
log.debug("Waiting for create shard action to complete");
countDownLatch.await(5, TimeUnit.MINUTES);
log.debug("Finished waiting for create shard action to complete");
log.info("Finished create command on all shards for collection: " + collectionName);
}
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
String sliceName = message.getStr(SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
List<ReplicaPosition> positions;
if (usePolicyFramework) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
positions = Assign.identifyNodes(cloudManager,
clusterState,
Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM),
collection.getName(),
message,
Collections.singletonList(sliceName),
numNrtReplicas,
numTlogReplicas,
numPullReplicas);
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
} else {
List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas,
createNodeSetStr, cloudManager);
int i = 0;
positions = new ArrayList<>();
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
Replica.Type.TLOG, numTlogReplicas,
Replica.Type.PULL, numPullReplicas
).entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
i++;
}
}
}
return positions;
}
}

View File

@ -268,7 +268,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList addResult = new NamedList();
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = null;
ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null).get(0);
log.debug("props " + props);
if (replica.equals(slice.getLeader()) || waitForFinalState) {
watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);

View File

@ -709,7 +709,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
}
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws Exception {
return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);

View File

@ -140,7 +140,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
log.debug("Successfully created replica for collection={} shard={} on node={}",
sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
}
});
}).get(0);
if (addedReplica != null) {
createdReplicas.add(addedReplica);

View File

@ -694,6 +694,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
copy(req.getParams(), map,
REPLICATION_FACTOR,
NRT_REPLICAS,
TLOG_REPLICAS,
PULL_REPLICAS,
CREATE_NODE_SET,
WAIT_FOR_FINAL_STATE);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
@ -828,7 +831,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
DATA_DIR,
ULOG_DIR,
REPLICA_TYPE,
WAIT_FOR_FINAL_STATE);
WAIT_FOR_FINAL_STATE,
NRT_REPLICAS,
TLOG_REPLICAS,
PULL_REPLICAS,
CREATE_NODE_SET);
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
}),
OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),

View File

@ -17,7 +17,10 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@ -26,11 +29,15 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.util.LogLevel;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
/**
*
*/
@ -45,10 +52,75 @@ public class AddReplicaTest extends SolrCloudTestCase {
.configure();
}
@Before
public void setUp() throws Exception {
super.setUp();
cluster.deleteAllCollections();
}
@Test
public void testAddMultipleReplicas() throws Exception {
cluster.waitForAllNodes(5);
String collection = "testAddMultipleReplicas";
CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf1", 1, 1);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(1)
.setTlogReplicas(1)
.setPullReplicas(1);
RequestStatusState status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(COMPLETED, status);
DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
assertEquals(4, docCollection.getReplicas().size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
// try to add 5 more replicas which should fail because numNodes(4)*maxShardsPerNode(2)=8 and 4 replicas already exist
addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(3)
.setTlogReplicas(1)
.setPullReplicas(1);
status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(FAILED, status);
docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
// sanity check that everything is as before
assertEquals(4, docCollection.getReplicas().size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
// but adding any number of replicas is supported if an explicit create node set is specified
// so test that as well
List<String> createNodeSet = new ArrayList<>(2);
createNodeSet.add(cluster.getRandomJetty(random()).getNodeName());
createNodeSet.add(cluster.getRandomJetty(random()).getNodeName());
addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(3)
.setTlogReplicas(1)
.setPullReplicas(1)
.setCreateNodeSet(String.join(",", createNodeSet));
status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(COMPLETED, status);
docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
// sanity check that everything is as before
assertEquals(9, docCollection.getReplicas().size());
assertEquals(5, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
}
@Test
//commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void test() throws Exception {
cluster.waitForAllNodes(5000);
cluster.waitForAllNodes(5);
String collection = "addreplicatest_coll";
CloudSolrClient cloudClient = cluster.getSolrClient();
@ -65,16 +137,16 @@ public class AddReplicaTest extends SolrCloudTestCase {
addReplica.processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED);
assertNotSame(rsp.getRequestStatus(), COMPLETED);
// wait for async request success
boolean success = false;
for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
if (rsp.getRequestStatus() == COMPLETED) {
success = true;
break;
}
assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED);
assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(500);
}
assertTrue(success);
@ -82,23 +154,23 @@ public class AddReplicaTest extends SolrCloudTestCase {
replicas2.removeAll(replicas);
assertEquals(1, replicas2.size());
Replica r = replicas2.iterator().next();
assertTrue(r.toString(), r.getState() != Replica.State.ACTIVE);
assertNotSame(r.toString(), r.getState(), Replica.State.ACTIVE);
// use waitForFinalState
addReplica.setWaitForFinalState(true);
addReplica.processAsync("001", cloudClient);
requestStatus = CollectionAdminRequest.requestStatus("001");
rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED);
assertNotSame(rsp.getRequestStatus(), COMPLETED);
// wait for async request success
success = false;
for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
if (rsp.getRequestStatus() == COMPLETED) {
success = true;
break;
}
assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED);
assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(500);
}
assertTrue(success);
@ -114,7 +186,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
if (replica.getName().equals(replica2)) {
continue; // may be still recovering
}
assertTrue(coll.toString() + "\n" + replica.toString(), replica.getState() == Replica.State.ACTIVE);
assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
}
}
}

View File

@ -158,7 +158,7 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
assertTrue("Should have gotten the right error message back",
e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
// And finally, insure that there are all the replcias we expect. We should have shards 1, 2 and 4 and each
// And finally, ensure that there are all the replicas we expect. We should have shards 1, 2 and 4 and each
// should have exactly two replicas
waitForState("Expected shards shardstart, 1, 2 and 4, each with two active replicas", collectionName, (n, c) -> {
return DocCollection.isFullyActive(n, c, 4, 2);

View File

@ -542,7 +542,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
@Test
public void testSplitShardWithRule() throws Exception {
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.REWRITE);
}
@Test

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -413,23 +414,46 @@ public class SimClusterStateProvider implements ClusterStateProvider {
ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, sessionWrapper);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
List<ReplicaPosition> replicaPositions = AddReplicaCmd.buildReplicaPositions(cloudManager, clusterState, coll.getName(), message, replicaTypesVsCount, sessionWrapper);
for (ReplicaPosition replicaPosition : replicaPositions) {
AddReplicaCmd.CreateReplica createReplica = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, replicaPosition);
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
createReplica.coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
}
ReplicaInfo ri = new ReplicaInfo(
createReplica.coreNodeName,
createReplica.coreName,
createReplica.collectionName,
createReplica.sliceName,
createReplica.replicaType,
createReplica.node,
message.getProperties()
);
simAddReplica(ri.getNode(), ri, true);
}
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
message = message.plus(CoreAdminParams.CORE_NODE_NAME, Assign.assignCoreNodeName(stateManager, coll));
}
ReplicaInfo ri = new ReplicaInfo(
message.getStr(CoreAdminParams.CORE_NODE_NAME),
message.getStr(CoreAdminParams.NAME),
message.getStr(ZkStateReader.COLLECTION_PROP),
message.getStr(ZkStateReader.SHARD_ID_PROP),
Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)),
message.getStr(CoreAdminParams.NODE),
message.getProperties()
);
simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
results.add("success", "");
}
@ -1015,31 +1039,30 @@ public class SimClusterStateProvider implements ClusterStateProvider {
.filter(e -> !e.getKey().equals("replicas"))
.forEach(e -> props.put(e.getKey(), e.getValue()));
// 2. create new replicas
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
List<ReplicaPosition> positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName,
message, sessionWrapper);
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
AtomicInteger replicaNum = new AtomicInteger(1);
positions.forEach(pos -> {
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard);
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(pos.node, "http"));
String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
replicaNum.getAndIncrement());
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0)));
int numPullReplicas = message.getInt(PULL_REPLICAS, message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0)));
replicaTypesVsCount.put(Replica.Type.NRT, numNrtReplicas);
replicaTypesVsCount.put(Replica.Type.TLOG, numTlogReplicas);
replicaTypesVsCount.put(Replica.Type.PULL, numPullReplicas);
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.NRT_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.NRT)),
ZkStateReader.TLOG_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.TLOG)),
ZkStateReader.PULL_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.PULL)),
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET)
);
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
simAddReplica(pos.node, ri, false);
simAddReplica(addReplicasProps, results);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");

View File

@ -236,7 +236,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
String pullNodeName = nodes.get(1);
int pullPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(pullNodeName, ImplicitSnitch.PORT);
String tlogNodeName = nodes.get(1);
String tlogNodeName = nodes.get(2);
int tlogPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(tlogNodeName, ImplicitSnitch.PORT);
log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName);

View File

@ -396,6 +396,10 @@ Use SPLITSHARD for collections created with the 'compositeId' router (`router.ke
`/admin/collections?action=CREATESHARD&shard=_shardName_&collection=_name_`
The default values for `replicationFactor` or `nrtReplicas`, `tlogReplicas`, `pullReplicas` from the collection is used to determine the number of replicas to be created for the new shard. This can be customized by explicitly passing the corresponding parameters to the request.
The API uses the Autoscaling framework to find the best possible nodes in the cluster when an Autoscaling preferences or policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
=== CREATESHARD Parameters
`collection`::
@ -409,6 +413,15 @@ Allows defining the nodes to spread the new collection across. If not provided,
+
The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`.
`nrtReplicas`::
The number of `nrt` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`tlogReplicas`::
The number of `tlog` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`pullReplicas`::
The number of `pull` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`property._name_=_value_`::
Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values.
@ -1016,9 +1029,9 @@ http://localhost:8983/solr/admin/collections?action=DELETEREPLICA&collection=tes
[[addreplica]]
== ADDREPLICA: Add Replica
Add a replica to a shard in a collection. The node name can be specified if the replica is to be created in a specific node.
Add one or more replicas to a shard in a collection. The node name can be specified if the replica is to be created in a specific node. Otherwise, a set of nodes can be specified and the most suitable ones among them will be chosen to create the replica(s).
The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica but only when an Autoscaling policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica(s) but only when an Autoscaling preferences or policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
`/admin/collections?action=ADDREPLICA&collection=_collection_&shard=_shard_&node=_nodeName_`
@ -1038,7 +1051,14 @@ If the exact shard name is not known, users may pass the `\_route_` value and th
Ignored if the `shard` parameter is also specified.
`node`::
The name of the node where the replica should be created.
The name of the node where the replica should be created (optional)
`createNodeSet`::
A comma-separated list of nodes among which the best ones will be chosen to place the replicas (optional)
+
The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`.
If neither `node`, nor `createNodeSet` is specified then the best node(s) from among all the live nodes in the cluster are chosen.
`instanceDir`::
The instanceDir for the core that will be created.
@ -1057,6 +1077,15 @@ The type of replica to create. These possible values are allowed:
+
See the section <<shards-and-indexing-data-in-solrcloud.adoc#types-of-replicas,Types of Replicas>> for more information about replica type options.
`nrtReplicas`::
The number of `nrt` replicas that should be created (optional, defaults to 1 if `type` is `nrt` otherwise 0).
`tlogReplicas`::
The number of `tlog` replicas that should be created (optional, defaults to 1 if `type` is `tlog` otherwise 0).
`pullReplicas`::
The number of `pull` replicas that should be created (optional, defaults to 1 if `type` is `pull` otherwise 0).
`property._name_=_value_`::
Set core property _name_ to _value_. See <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details about supported properties and values.
@ -1096,6 +1125,39 @@ http://localhost:8983/solr/admin/collections?action=ADDREPLICA&collection=test2&
</response>
----
[source,text]
----
http://localhost:8983/solr/admin/collections?action=addreplica&collection=gettingstarted&shard=shard1&tlogReplicas=1&pullReplicas=1
----
*Output*
[source,json]
----
{
"responseHeader": {
"status": 0,
"QTime": 784
},
"success": {
"127.0.1.1:7574_solr": {
"responseHeader": {
"status": 0,
"QTime": 257
},
"core": "gettingstarted_shard1_replica_p11"
},
"127.0.1.1:8983_solr": {
"responseHeader": {
"status": 0,
"QTime": 295
},
"core": "gettingstarted_shard1_replica_t10"
}
}
}
----
[[clusterprop]]
== CLUSTERPROP: Cluster Properties

View File

@ -57,7 +57,10 @@ import static org.apache.solr.common.cloud.DocCollection.RULE;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
@ -1648,6 +1651,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected String ulogDir;
protected Properties properties;
protected Replica.Type type;
protected Integer nrtReplicas, tlogReplicas, pullReplicas;
protected String createNodeSet;
private AddReplica(String collection, String shard, String routeKey, Replica.Type type) {
super(CollectionAction.ADDREPLICA);
@ -1727,6 +1732,42 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return shard;
}
public Integer getNrtReplicas() {
return nrtReplicas;
}
public AddReplica setNrtReplicas(Integer nrtReplicas) {
this.nrtReplicas = nrtReplicas;
return this;
}
public Integer getTlogReplicas() {
return tlogReplicas;
}
public AddReplica setTlogReplicas(Integer tlogReplicas) {
this.tlogReplicas = tlogReplicas;
return this;
}
public Integer getPullReplicas() {
return pullReplicas;
}
public AddReplica setPullReplicas(Integer pullReplicas) {
this.pullReplicas = pullReplicas;
return this;
}
public String getCreateNodeSet() {
return createNodeSet;
}
public AddReplica setCreateNodeSet(String createNodeSet) {
this.createNodeSet = createNodeSet;
return this;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
@ -1759,6 +1800,18 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (properties != null) {
addProperties(params, properties);
}
if (nrtReplicas != null) {
params.add(NRT_REPLICAS, String.valueOf(nrtReplicas));
}
if (tlogReplicas != null) {
params.add(TLOG_REPLICAS, String.valueOf(tlogReplicas));
}
if (pullReplicas != null) {
params.add(PULL_REPLICAS, String.valueOf(pullReplicas));
}
if (createNodeSet != null) {
params.add(CREATE_NODE_SET_PARAM, createNodeSet);
}
return params;
}