This commit is contained in:
Karl Wright 2018-09-23 06:45:59 -04:00
commit e6e3dc7ea8
17 changed files with 576 additions and 306 deletions

View File

@ -170,6 +170,9 @@ Optimizations
* LUCENE-8448: Boolean queries now propagates the mininum score to their sub-scorers. * LUCENE-8448: Boolean queries now propagates the mininum score to their sub-scorers.
(Jim Ferenczi, Adrien Grand) (Jim Ferenczi, Adrien Grand)
* LUCENE-8511: MultiFields.getIndexedFields is now optimized; does not call getMergedFieldInfos
(David Smiley)
======================= Lucene 7.6.0 ======================= ======================= Lucene 7.6.0 =======================
Build Build

View File

@ -21,12 +21,13 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
@ -266,7 +267,8 @@ public final class MultiFields extends Fields {
public static FieldInfos getMergedFieldInfos(IndexReader reader) { public static FieldInfos getMergedFieldInfos(IndexReader reader) {
final String softDeletesField = reader.leaves().stream() final String softDeletesField = reader.leaves().stream()
.map(l -> l.reader().getFieldInfos().getSoftDeletesField()) .map(l -> l.reader().getFieldInfos().getSoftDeletesField())
.filter(Objects::nonNull).findAny().orElse(null); .filter(Objects::nonNull)
.findAny().orElse(null);
final FieldInfos.Builder builder = new FieldInfos.Builder(new FieldInfos.FieldNumbers(softDeletesField)); final FieldInfos.Builder builder = new FieldInfos.Builder(new FieldInfos.FieldNumbers(softDeletesField));
for(final LeafReaderContext ctx : reader.leaves()) { for(final LeafReaderContext ctx : reader.leaves()) {
builder.add(ctx.reader().getFieldInfos()); builder.add(ctx.reader().getFieldInfos());
@ -274,22 +276,13 @@ public final class MultiFields extends Fields {
return builder.finish(); return builder.finish();
} }
/** Call this to get the (merged) FieldInfos representing the /** Returns a set of names of fields that have a terms index. The order is undefined. */
* set of indexed fields <b>only</b> for a composite reader.
* <p>
* NOTE: the returned field numbers will likely not
* correspond to the actual field numbers in the underlying
* readers, and codec metadata ({@link FieldInfo#getAttribute(String)}
* will be unavailable.
*/
public static Collection<String> getIndexedFields(IndexReader reader) { public static Collection<String> getIndexedFields(IndexReader reader) {
final Collection<String> fields = new HashSet<>(); return reader.leaves().stream()
for(final FieldInfo fieldInfo : getMergedFieldInfos(reader)) { .flatMap(l -> StreamSupport.stream(l.reader().getFieldInfos().spliterator(), false)
if (fieldInfo.getIndexOptions() != IndexOptions.NONE) { .filter(fi -> fi.getIndexOptions() != IndexOptions.NONE))
fields.add(fieldInfo.name); .map(fi -> fi.name)
} .collect(Collectors.toSet());
}
return fields;
} }
private static class LeafReaderFields extends Fields { private static class LeafReaderFields extends Fields {

View File

@ -90,12 +90,23 @@ New Features
error. Previously, the collapsing behavior was unreliable and undefined despite no explicit error. error. Previously, the collapsing behavior was unreliable and undefined despite no explicit error.
(Munendra S N, David Smiley) (Munendra S N, David Smiley)
* SOLR-9317: ADDREPLICA command should be able to add more than one replica to a collection,shard at a time.
The API now supports 'nrtReplicas', 'tlogReplicas', 'pullReplicas' parameters as well 'createNodeSet' parameter.
As part of this change, the CREATESHARD API now delegates placing replicas entirely to the ADDREPLICA command
and uses the new parameters to add all the replicas in one API call. (shalin)
Other Changes Other Changes
---------------------- ----------------------
* SOLR-12762: Fix javadoc for SolrCloudTestCase.clusterShape() method and add a method that validates only against * SOLR-12762: Fix javadoc for SolrCloudTestCase.clusterShape() method and add a method that validates only against
Active slices (Anshum Gupta) Active slices (Anshum Gupta)
Bug Fixes
----------------------
* SOLR-11836: FacetStream works with bucketSizeLimit of -1 which will fetch all the buckets.
(Alfonso Muñoz-Pomer Fuentes, Amrit Sarkar via Varun Thacker)
================== 7.5.0 ================== ================== 7.5.0 ==================
@ -352,9 +363,6 @@ Bug Fixes
* SOLR-12733: SolrMetricReporterTest failure (Erick Erickson, David Smiley) * SOLR-12733: SolrMetricReporterTest failure (Erick Erickson, David Smiley)
* SOLR-11836: FacetStream works with bucketSizeLimit of -1 which will fetch all the buckets.
(Alfonso Muñoz-Pomer Fuentes, Amrit Sarkar via Varun Thacker)
* SOLR-12765: Incorrect format of JMX cache stats. (Bojan Smid, ab) * SOLR-12765: Incorrect format of JMX cache stats. (Bojan Smid, ab)
Optimizations Optimizations

View File

@ -20,14 +20,17 @@ package org.apache.solr.cloud.api.collections;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@ -41,6 +44,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
@ -54,10 +58,14 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@ -79,28 +87,116 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
addReplica(state, message, results, null); addReplica(state, message, results, null);
} }
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws IOException, InterruptedException { throws IOException, InterruptedException {
log.debug("addReplica() : {}", Utils.toJSONString(message)); log.debug("addReplica() : {}", Utils.toJSONString(message));
String collectionName = message.getStr(COLLECTION_PROP); String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
DocCollection coll = clusterState.getCollection(collectionName); DocCollection coll = clusterState.getCollection(collectionName);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " does not exist");
}
if (coll.getSlice(shard) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collectionName + " shard: " + shard + " does not exist");
}
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final String asyncId = message.getStr(ASYNC); final String asyncId = message.getStr(ASYNC);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
String node = message.getStr(CoreAdminParams.NODE); String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP); String createNodeSetStr = message.getStr(CREATE_NODE_SET);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME); if (node != null && createNodeSetStr != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Both 'node' and 'createNodeSet' parameters cannot be specified together.");
}
int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
boolean parallel = message.getBool("parallel", false); boolean parallel = message.getBool("parallel", false);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount, sessionWrapper)
.stream()
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
.collect(Collectors.toList());
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ZkStateReader zkStateReader = ocmh.zkStateReader;
// For tracking async calls.
Map<String,String> requestMap = new HashMap<>();
for (CreateReplica createReplica : createReplicas) {
assert createReplica.coreName != null;
ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica);
ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap);
}
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
for (CreateReplica replica : createReplicas) {
ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
}
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null,
createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
}
return createReplicas.stream()
.map(createReplica -> new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, createReplica.collectionName,
ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.NODE_NAME_PROP, createReplica.node
))
.collect(Collectors.toList());
}
private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException {
if (coll.getStr(WITH_COLLECTION) != null) { if (coll.getStr(WITH_COLLECTION) != null) {
String withCollectionName = coll.getStr(WITH_COLLECTION); String withCollectionName = coll.getStr(WITH_COLLECTION);
DocCollection withCollection = clusterState.getCollection(withCollectionName); DocCollection withCollection = clusterState.getCollection(withCollectionName);
@ -109,14 +205,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
} }
String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName(); String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName();
List<Replica> replicas = withCollection.getReplicas(node); List<Replica> replicas = withCollection.getReplicas(createReplica.node);
if (replicas == null || replicas.isEmpty()) { if (replicas == null || replicas.isEmpty()) {
// create a replica of withCollection on the identified node before proceeding further // create a replica of withCollection on the identified node before proceeding further
ZkNodeProps props = new ZkNodeProps( ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, withCollectionName, ZkStateReader.COLLECTION_PROP, withCollectionName,
ZkStateReader.SHARD_ID_PROP, withCollectionShard, ZkStateReader.SHARD_ID_PROP, withCollectionShard,
"node", node, "node", createReplica.node,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
addReplica(clusterState, props, results, null); addReplica(clusterState, props, results, null);
} }
@ -130,14 +226,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps props = new ZkNodeProps( ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP, collectionName, ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node), ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
ZkStateReader.NODE_NAME_PROP, node, ZkStateReader.NODE_NAME_PROP, createReplica.node,
ZkStateReader.REPLICA_TYPE, replicaType.name()); ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
if (coreNodeName != null) { if (createReplica.coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
} }
try { try {
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
@ -146,7 +242,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
} }
} }
params.set(CoreAdminParams.CORE_NODE_NAME, params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName()); ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
} }
String configName = zkStateReader.readConfigName(collectionName); String configName = zkStateReader.readConfigName(collectionName);
@ -156,12 +252,12 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, coreName); params.set(CoreAdminParams.NAME, createReplica.coreName);
params.set(COLL_CONF, configName); params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName); params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name()); params.set(CoreAdminParams.REPLICA_TYPE, createReplica.replicaType.name());
if (shard != null) { if (createReplica.sliceName != null) {
params.set(CoreAdminParams.SHARD, shard); params.set(CoreAdminParams.SHARD, createReplica.sliceName);
} else if (routeKey != null) { } else if (routeKey != null) {
Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll); Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
if (slices.isEmpty()) { if (slices.isEmpty()) {
@ -181,108 +277,34 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (instanceDir != null) { if (instanceDir != null) {
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir); params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
} }
if (coreNodeName != null) { if (createReplica.coreNodeName != null) {
params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName); params.set(CoreAdminParams.CORE_NODE_NAME, createReplica.coreNodeName);
} }
ocmh.addPropertyParams(message, params); ocmh.addPropertyParams(message, params);
// For tracking async calls. return params;
Map<String,String> requestMap = new HashMap<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
final String fnode = node;
final String fcoreName = coreName;
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName);
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
}
return new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, node
);
} }
public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState, public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException { ZkNodeProps message, ReplicaPosition replicaPosition) {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String collection = message.getStr(COLLECTION_PROP); String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr(CoreAdminParams.NODE); String node = replicaPosition.node;
String shard = message.getStr(SHARD_ID_PROP); String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME); String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME); String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); Replica.Type replicaType = replicaPosition.type;
if (StringUtils.isBlank(coreName)) { if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME); coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
} }
DocCollection coll = clusterState.getCollection(collection);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
if (coll.getSlice(shard) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " shard: " + shard + " does not exist");
}
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
if (node == null) {
if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
node = Assign.identifyNodes(cloudManager,
clusterState,
Collections.emptyList(),
collection,
message,
Collections.singletonList(shard),
replicaType == Replica.Type.NRT ? 1 : 0,
replicaType == Replica.Type.TLOG ? 1 : 0,
replicaType == Replica.Type.PULL ? 1 : 0
).get(0).node;
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
}
}
log.info("Node Identified {} for creating new replica of shard {}", node, shard); log.info("Node Identified {} for creating new replica of shard {}", node, shard);
if (!clusterState.liveNodesContain(node)) { if (!clusterState.liveNodesContain(node)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
} }
DocCollection coll = clusterState.getCollection(collection);
if (coreName == null) { if (coreName == null) {
coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType); coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
} else if (!skipCreateReplicaInClusterState) { } else if (!skipCreateReplicaInClusterState) {
@ -297,11 +319,103 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
} }
} }
} }
if (coreNodeName != null) { return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
}
message = message.plus(CoreAdminParams.NAME, coreName);
message = message.plus(CoreAdminParams.NODE, node);
return message;
} }
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message,
EnumMap<Replica.Type, Integer> replicaTypeVsCount,
AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String sliceName = message.getStr(SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = replicaTypeVsCount.get(Replica.Type.NRT);
int numPullReplicas = replicaTypeVsCount.get(Replica.Type.PULL);
int numTlogReplicas = replicaTypeVsCount.get(Replica.Type.TLOG);
int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
String node = message.getStr(CoreAdminParams.NODE);
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
if (createNodeSetStr == null) {
if (node != null) {
message.getProperties().put(OverseerCollectionMessageHandler.CREATE_NODE_SET, node);
createNodeSetStr = node;
}
}
List<ReplicaPosition> positions = null;
if (!skipCreateReplicaInClusterState) {
if (CloudUtil.usePolicyFramework(collection, cloudManager)) {
if (node == null) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
positions = Assign.identifyNodes(cloudManager,
clusterState,
Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM),
collection.getName(),
message,
Collections.singletonList(sliceName),
numNrtReplicas,
numTlogReplicas,
numPullReplicas);
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager);
int i = 0;
positions = new ArrayList<>();
for (Map.Entry<Replica.Type, Integer> e : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
i++;
}
}
}
}
if (positions == null) {
assert node != null;
if (node == null) {
// in case asserts are disabled
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"A node should have been identified to add replica but wasn't. Please inform solr developers at SOLR-9317");
}
// it is unlikely that multiple replicas have been requested to be created on
// the same node, but we've got to accommodate.
positions = new ArrayList<>(totalReplicas);
int i = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypeVsCount.entrySet()) {
for (int j = 0; j < entry.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node));
}
}
}
return positions;
}
/**
* A data structure to keep all information required to create a new replica in one place.
* Think of it as a typed ZkNodeProps for replica creation.
*
* This is <b>not</b> a public API and can be changed at any time without notice.
*/
public static class CreateReplica {
public final String collectionName;
public final String sliceName;
public final String node;
public final Replica.Type replicaType;
public String coreName;
public String coreNodeName;
CreateReplica(String collectionName, String sliceName, String node, Replica.Type replicaType, String coreName, String coreNodeName) {
this.collectionName = collectionName;
this.sliceName = sliceName;
this.node = node;
this.replicaType = replicaType;
this.coreName = coreName;
this.coreNodeName = coreNodeName;
}
}
} }

View File

@ -273,7 +273,7 @@ public class Assign {
} else { } else {
if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) { if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies"); Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules");
} }
} }
@ -322,11 +322,11 @@ public class Assign {
// Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas // Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc. // could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName, public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
String shard, int nrtReplicas, String shard, int nrtReplicas, int tlogReplicas, int pullReplicas,
Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException, AssignmentException { Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException, AssignmentException {
log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet ); log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}", shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet );
DocCollection coll = clusterState.getCollection(collectionName); DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getMaxShardsPerNode(); Integer maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode();
List<String> createNodeList = null; List<String> createNodeList = null;
if (createNodeSet instanceof List) { if (createNodeSet instanceof List) {
@ -338,15 +338,15 @@ public class Assign {
HashMap<String, ReplicaCount> nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList); HashMap<String, ReplicaCount> nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList);
if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes. if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes.
int availableSlots = 0; long availableSlots = 0;
for (Map.Entry<String, ReplicaCount> ent : nodeNameVsShardCount.entrySet()) { for (Map.Entry<String, ReplicaCount> ent : nodeNameVsShardCount.entrySet()) {
//ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary. //ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary.
if (maxShardsPerNode > ent.getValue().thisCollectionNodes) { if (maxShardsPerNode > ent.getValue().thisCollectionNodes) {
availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes); availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes);
} }
} }
if (availableSlots < nrtReplicas) { if (availableSlots < nrtReplicas + tlogReplicas + pullReplicas) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, throw new AssignmentException(
String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d", String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d",
nrtReplicas, collectionName, maxShardsPerNode)); nrtReplicas, collectionName, maxShardsPerNode));
} }
@ -355,13 +355,17 @@ public class Assign {
List l = (List) coll.get(DocCollection.RULE); List l = (List) coll.get(DocCollection.RULE);
List<ReplicaPosition> replicaPositions = null; List<ReplicaPosition> replicaPositions = null;
if (l != null) { if (l != null) {
if (tlogReplicas + pullReplicas > 0) {
throw new AssignmentException(Replica.Type.TLOG + " or " + Replica.Type.PULL +
" replica types not supported with placement rules");
}
// TODO: make it so that this method doesn't require access to CC // TODO: make it so that this method doesn't require access to CC
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l); replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l);
} }
String policyName = coll.getStr(POLICY); String policyName = coll.getStr(POLICY);
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(); AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) { if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0, replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, tlogReplicas, pullReplicas,
policyName, cloudManager, createNodeList); policyName, cloudManager, createNodeList);
} }
@ -461,7 +465,7 @@ public class Assign {
return nodeNameVsShardCount; return nodeNameVsShardCount;
} }
DocCollection coll = clusterState.getCollection(collectionName); DocCollection coll = clusterState.getCollection(collectionName);
Integer maxShardsPerNode = coll.getMaxShardsPerNode(); int maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode();
Map<String, DocCollection> collections = clusterState.getCollectionsMap(); Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) { for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
DocCollection c = entry.getValue(); DocCollection c = entry.getValue();

View File

@ -17,32 +17,17 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
@ -77,114 +62,60 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
DocCollection collection = clusterState.getCollection(collectionName); DocCollection collection = clusterState.getCollection(collectionName);
ZkStateReader zkStateReader = ocmh.zkStateReader;
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
SolrCloseableLatch countDownLatch;
try {
List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we see the shard
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
for (ReplicaPosition position : positions) {
String nodeName = position.node;
String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+ " on " + nodeName);
// Need to create new params for each request
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.REPLICA_TYPE, position.type.name(),
CoreAdminParams.NODE, nodeName,
CoreAdminParams.NAME, coreName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Map<String, Object> propertyParams = new HashMap<>();
ocmh.addPropertyParams(message, propertyParams);
addReplicasProps = addReplicasProps.plus(propertyParams);
if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
final NamedList addResult = new NamedList();
ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
countDownLatch.countDown();
Object addResultFailure = addResult.get("failure");
if (addResultFailure != null) {
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.addAll((NamedList) addResultFailure);
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.addAll((NamedList) addResult.get("success"));
}
});
}
} finally {
if (sessionWrapper.get() != null) sessionWrapper.get().release();
}
log.debug("Waiting for create shard action to complete");
countDownLatch.await(5, TimeUnit.MINUTES);
log.debug("Finished waiting for create shard action to complete");
log.info("Finished create command on all shards for collection: " + collectionName);
}
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
String sliceName = message.getStr(SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(collectionName);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1)))); int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0)); int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0)); int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
if (numNrtReplicas + numTlogReplicas <= 0) { if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
} }
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET); ZkStateReader zkStateReader = ocmh.zkStateReader;
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we see the shard
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.NRT_REPLICAS, String.valueOf(numNrtReplicas),
ZkStateReader.TLOG_REPLICAS, String.valueOf(numTlogReplicas),
ZkStateReader.PULL_REPLICAS, String.valueOf(numPullReplicas),
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager); Map<String, Object> propertyParams = new HashMap<>();
List<ReplicaPosition> positions; ocmh.addPropertyParams(message, propertyParams);
if (usePolicyFramework) { addReplicasProps = addReplicasProps.plus(propertyParams);
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName()); if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
positions = Assign.identifyNodes(cloudManager, final NamedList addResult = new NamedList();
clusterState, try {
Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM), ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> {
collection.getName(), Object addResultFailure = addResult.get("failure");
message, if (addResultFailure != null) {
Collections.singletonList(sliceName), SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
numNrtReplicas, if (failure == null) {
numTlogReplicas, failure = new SimpleOrderedMap();
numPullReplicas); results.add("failure", failure);
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true)); }
} else { failure.addAll((NamedList) addResultFailure);
List<Assign.ReplicaCount> sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas, } else {
createNodeSetStr, cloudManager); SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
int i = 0; if (success == null) {
positions = new ArrayList<>(); success = new SimpleOrderedMap();
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas, results.add("success", success);
Replica.Type.TLOG, numTlogReplicas, }
Replica.Type.PULL, numPullReplicas success.addAll((NamedList) addResult.get("success"));
).entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
i++;
} }
} });
} catch (Assign.AssignmentException e) {
// clean up the slice that we created
ZkNodeProps deleteShard = new ZkNodeProps(COLLECTION_PROP, collectionName, SHARD_ID_PROP, sliceName, ASYNC, async);
new DeleteShardCmd(ocmh).call(clusterState, deleteShard, results);
throw e;
} }
return positions;
log.info("Finished create command on all shards for collection: " + collectionName);
} }
} }

View File

@ -268,7 +268,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList addResult = new NamedList(); NamedList addResult = new NamedList();
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh); SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = null; ActiveReplicaWatcher watcher = null;
ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null); ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null).get(0);
log.debug("props " + props); log.debug("props " + props);
if (replica.equals(slice.getLeader()) || waitForFinalState) { if (replica.equals(slice.getLeader()) || waitForFinalState) {
watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch); watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);

View File

@ -709,7 +709,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} }
} }
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws Exception { throws Exception {
return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete); return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);

View File

@ -140,7 +140,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
log.debug("Successfully created replica for collection={} shard={} on node={}", log.debug("Successfully created replica for collection={} shard={} on node={}",
sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target); sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
} }
}); }).get(0);
if (addedReplica != null) { if (addedReplica != null) {
createdReplicas.add(addedReplica); createdReplicas.add(addedReplica);

View File

@ -694,6 +694,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections"); throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
copy(req.getParams(), map, copy(req.getParams(), map,
REPLICATION_FACTOR, REPLICATION_FACTOR,
NRT_REPLICAS,
TLOG_REPLICAS,
PULL_REPLICAS,
CREATE_NODE_SET, CREATE_NODE_SET,
WAIT_FOR_FINAL_STATE); WAIT_FOR_FINAL_STATE);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
@ -828,7 +831,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
DATA_DIR, DATA_DIR,
ULOG_DIR, ULOG_DIR,
REPLICA_TYPE, REPLICA_TYPE,
WAIT_FOR_FINAL_STATE); WAIT_FOR_FINAL_STATE,
NRT_REPLICAS,
TLOG_REPLICAS,
PULL_REPLICAS,
CREATE_NODE_SET);
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
}), }),
OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()), OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),

View File

@ -17,7 +17,10 @@
package org.apache.solr.cloud; package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@ -26,11 +29,15 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.util.LogLevel; import org.apache.solr.util.LogLevel;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
/** /**
* *
*/ */
@ -45,10 +52,75 @@ public class AddReplicaTest extends SolrCloudTestCase {
.configure(); .configure();
} }
@Before
public void setUp() throws Exception {
super.setUp();
cluster.deleteAllCollections();
}
@Test
public void testAddMultipleReplicas() throws Exception {
cluster.waitForAllNodes(5);
String collection = "testAddMultipleReplicas";
CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf1", 1, 1);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(1)
.setTlogReplicas(1)
.setPullReplicas(1);
RequestStatusState status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(COMPLETED, status);
DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
assertEquals(4, docCollection.getReplicas().size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
// try to add 5 more replicas which should fail because numNodes(4)*maxShardsPerNode(2)=8 and 4 replicas already exist
addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(3)
.setTlogReplicas(1)
.setPullReplicas(1);
status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(FAILED, status);
docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
// sanity check that everything is as before
assertEquals(4, docCollection.getReplicas().size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
// but adding any number of replicas is supported if an explicit create node set is specified
// so test that as well
List<String> createNodeSet = new ArrayList<>(2);
createNodeSet.add(cluster.getRandomJetty(random()).getNodeName());
createNodeSet.add(cluster.getRandomJetty(random()).getNodeName());
addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1")
.setNrtReplicas(3)
.setTlogReplicas(1)
.setPullReplicas(1)
.setCreateNodeSet(String.join(",", createNodeSet));
status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120);
assertEquals(COMPLETED, status);
docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection);
assertNotNull(docCollection);
// sanity check that everything is as before
assertEquals(9, docCollection.getReplicas().size());
assertEquals(5, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
}
@Test @Test
//commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018 //commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void test() throws Exception { public void test() throws Exception {
cluster.waitForAllNodes(5000); cluster.waitForAllNodes(5);
String collection = "addreplicatest_coll"; String collection = "addreplicatest_coll";
CloudSolrClient cloudClient = cluster.getSolrClient(); CloudSolrClient cloudClient = cluster.getSolrClient();
@ -65,16 +137,16 @@ public class AddReplicaTest extends SolrCloudTestCase {
addReplica.processAsync("000", cloudClient); addReplica.processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); assertNotSame(rsp.getRequestStatus(), COMPLETED);
// wait for async request success // wait for async request success
boolean success = false; boolean success = false;
for (int i = 0; i < 200; i++) { for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient); rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { if (rsp.getRequestStatus() == COMPLETED) {
success = true; success = true;
break; break;
} }
assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED); assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(500); Thread.sleep(500);
} }
assertTrue(success); assertTrue(success);
@ -82,23 +154,23 @@ public class AddReplicaTest extends SolrCloudTestCase {
replicas2.removeAll(replicas); replicas2.removeAll(replicas);
assertEquals(1, replicas2.size()); assertEquals(1, replicas2.size());
Replica r = replicas2.iterator().next(); Replica r = replicas2.iterator().next();
assertTrue(r.toString(), r.getState() != Replica.State.ACTIVE); assertNotSame(r.toString(), r.getState(), Replica.State.ACTIVE);
// use waitForFinalState // use waitForFinalState
addReplica.setWaitForFinalState(true); addReplica.setWaitForFinalState(true);
addReplica.processAsync("001", cloudClient); addReplica.processAsync("001", cloudClient);
requestStatus = CollectionAdminRequest.requestStatus("001"); requestStatus = CollectionAdminRequest.requestStatus("001");
rsp = requestStatus.process(cloudClient); rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); assertNotSame(rsp.getRequestStatus(), COMPLETED);
// wait for async request success // wait for async request success
success = false; success = false;
for (int i = 0; i < 200; i++) { for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient); rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { if (rsp.getRequestStatus() == COMPLETED) {
success = true; success = true;
break; break;
} }
assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED); assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(500); Thread.sleep(500);
} }
assertTrue(success); assertTrue(success);
@ -114,7 +186,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
if (replica.getName().equals(replica2)) { if (replica.getName().equals(replica2)) {
continue; // may be still recovering continue; // may be still recovering
} }
assertTrue(coll.toString() + "\n" + replica.toString(), replica.getState() == Replica.State.ACTIVE); assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
} }
} }
} }

View File

@ -158,7 +158,7 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
assertTrue("Should have gotten the right error message back", assertTrue("Should have gotten the right error message back",
e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of")); e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
// And finally, insure that there are all the replcias we expect. We should have shards 1, 2 and 4 and each // And finally, ensure that there are all the replicas we expect. We should have shards 1, 2 and 4 and each
// should have exactly two replicas // should have exactly two replicas
waitForState("Expected shards shardstart, 1, 2 and 4, each with two active replicas", collectionName, (n, c) -> { waitForState("Expected shards shardstart, 1, 2 and 4, each with two active replicas", collectionName, (n, c) -> {
return DocCollection.isFullyActive(n, c, 4, 2); return DocCollection.isFullyActive(n, c, 4, 2);

View File

@ -542,7 +542,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
@Test @Test
public void testSplitShardWithRule() throws Exception { public void testSplitShardWithRule() throws Exception {
doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK); doSplitShardWithRule(SolrIndexSplitter.SplitMethod.REWRITE);
} }
@Test @Test

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -413,23 +414,46 @@ public class SimClusterStateProvider implements ClusterStateProvider {
ClusterState clusterState = getClusterState(); ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP)); DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>(); AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, sessionWrapper);
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
List<ReplicaPosition> replicaPositions = AddReplicaCmd.buildReplicaPositions(cloudManager, clusterState, coll.getName(), message, replicaTypesVsCount, sessionWrapper);
for (ReplicaPosition replicaPosition : replicaPositions) {
AddReplicaCmd.CreateReplica createReplica = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, replicaPosition);
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
createReplica.coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
}
ReplicaInfo ri = new ReplicaInfo(
createReplica.coreNodeName,
createReplica.coreName,
createReplica.collectionName,
createReplica.sliceName,
createReplica.replicaType,
createReplica.node,
message.getProperties()
);
simAddReplica(ri.getNode(), ri, true);
}
if (sessionWrapper.get() != null) { if (sessionWrapper.get() != null) {
sessionWrapper.get().release(); sessionWrapper.get().release();
} }
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
message = message.plus(CoreAdminParams.CORE_NODE_NAME, Assign.assignCoreNodeName(stateManager, coll));
}
ReplicaInfo ri = new ReplicaInfo(
message.getStr(CoreAdminParams.CORE_NODE_NAME),
message.getStr(CoreAdminParams.NAME),
message.getStr(ZkStateReader.COLLECTION_PROP),
message.getStr(ZkStateReader.SHARD_ID_PROP),
Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)),
message.getStr(CoreAdminParams.NODE),
message.getProperties()
);
simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
results.add("success", ""); results.add("success", "");
} }
@ -1015,31 +1039,30 @@ public class SimClusterStateProvider implements ClusterStateProvider {
.filter(e -> !e.getKey().equals("replicas")) .filter(e -> !e.getKey().equals("replicas"))
.forEach(e -> props.put(e.getKey(), e.getValue())); .forEach(e -> props.put(e.getKey(), e.getValue()));
// 2. create new replicas // 2. create new replicas
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>(); EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
List<ReplicaPosition> positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName, int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
message, sessionWrapper); int numTlogReplicas = message.getInt(TLOG_REPLICAS, message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0)));
if (sessionWrapper.get() != null) { int numPullReplicas = message.getInt(PULL_REPLICAS, message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0)));
sessionWrapper.get().release(); replicaTypesVsCount.put(Replica.Type.NRT, numNrtReplicas);
} replicaTypesVsCount.put(Replica.Type.TLOG, numTlogReplicas);
AtomicInteger replicaNum = new AtomicInteger(1); replicaTypesVsCount.put(Replica.Type.PULL, numPullReplicas);
positions.forEach(pos -> {
Map<String, Object> replicaProps = new HashMap<>(); ZkNodeProps addReplicasProps = new ZkNodeProps(
replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard); COLLECTION_PROP, collectionName,
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node); SHARD_ID_PROP, sliceName,
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString()); ZkStateReader.NRT_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.NRT)),
replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(pos.node, "http")); ZkStateReader.TLOG_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.TLOG)),
String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT), ZkStateReader.PULL_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.PULL)),
replicaNum.getAndIncrement()); OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET)
);
try { try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName); simAddReplica(addReplicasProps, results);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
simAddReplica(pos.node, ri, false);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
});
Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()); collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
simRunLeaderElection(Collections.singleton(collectionName), true); simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", ""); results.add("success", "");

View File

@ -236,7 +236,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
String pullNodeName = nodes.get(1); String pullNodeName = nodes.get(1);
int pullPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(pullNodeName, ImplicitSnitch.PORT); int pullPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(pullNodeName, ImplicitSnitch.PORT);
String tlogNodeName = nodes.get(1); String tlogNodeName = nodes.get(2);
int tlogPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(tlogNodeName, ImplicitSnitch.PORT); int tlogPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(tlogNodeName, ImplicitSnitch.PORT);
log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName); log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName);

View File

@ -396,6 +396,10 @@ Use SPLITSHARD for collections created with the 'compositeId' router (`router.ke
`/admin/collections?action=CREATESHARD&shard=_shardName_&collection=_name_` `/admin/collections?action=CREATESHARD&shard=_shardName_&collection=_name_`
The default values for `replicationFactor` or `nrtReplicas`, `tlogReplicas`, `pullReplicas` from the collection is used to determine the number of replicas to be created for the new shard. This can be customized by explicitly passing the corresponding parameters to the request.
The API uses the Autoscaling framework to find the best possible nodes in the cluster when an Autoscaling preferences or policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
=== CREATESHARD Parameters === CREATESHARD Parameters
`collection`:: `collection`::
@ -409,6 +413,15 @@ Allows defining the nodes to spread the new collection across. If not provided,
+ +
The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`. The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`.
`nrtReplicas`::
The number of `nrt` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`tlogReplicas`::
The number of `tlog` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`pullReplicas`::
The number of `pull` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted)
`property._name_=_value_`:: `property._name_=_value_`::
Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values. Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values.
@ -1016,9 +1029,9 @@ http://localhost:8983/solr/admin/collections?action=DELETEREPLICA&collection=tes
[[addreplica]] [[addreplica]]
== ADDREPLICA: Add Replica == ADDREPLICA: Add Replica
Add a replica to a shard in a collection. The node name can be specified if the replica is to be created in a specific node. Add one or more replicas to a shard in a collection. The node name can be specified if the replica is to be created in a specific node. Otherwise, a set of nodes can be specified and the most suitable ones among them will be chosen to create the replica(s).
The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica but only when an Autoscaling policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details. The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica(s) but only when an Autoscaling preferences or policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
`/admin/collections?action=ADDREPLICA&collection=_collection_&shard=_shard_&node=_nodeName_` `/admin/collections?action=ADDREPLICA&collection=_collection_&shard=_shard_&node=_nodeName_`
@ -1038,7 +1051,14 @@ If the exact shard name is not known, users may pass the `\_route_` value and th
Ignored if the `shard` parameter is also specified. Ignored if the `shard` parameter is also specified.
`node`:: `node`::
The name of the node where the replica should be created. The name of the node where the replica should be created (optional)
`createNodeSet`::
A comma-separated list of nodes among which the best ones will be chosen to place the replicas (optional)
+
The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`.
If neither `node`, nor `createNodeSet` is specified then the best node(s) from among all the live nodes in the cluster are chosen.
`instanceDir`:: `instanceDir`::
The instanceDir for the core that will be created. The instanceDir for the core that will be created.
@ -1057,6 +1077,15 @@ The type of replica to create. These possible values are allowed:
+ +
See the section <<shards-and-indexing-data-in-solrcloud.adoc#types-of-replicas,Types of Replicas>> for more information about replica type options. See the section <<shards-and-indexing-data-in-solrcloud.adoc#types-of-replicas,Types of Replicas>> for more information about replica type options.
`nrtReplicas`::
The number of `nrt` replicas that should be created (optional, defaults to 1 if `type` is `nrt` otherwise 0).
`tlogReplicas`::
The number of `tlog` replicas that should be created (optional, defaults to 1 if `type` is `tlog` otherwise 0).
`pullReplicas`::
The number of `pull` replicas that should be created (optional, defaults to 1 if `type` is `pull` otherwise 0).
`property._name_=_value_`:: `property._name_=_value_`::
Set core property _name_ to _value_. See <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details about supported properties and values. Set core property _name_ to _value_. See <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details about supported properties and values.
@ -1096,6 +1125,39 @@ http://localhost:8983/solr/admin/collections?action=ADDREPLICA&collection=test2&
</response> </response>
---- ----
[source,text]
----
http://localhost:8983/solr/admin/collections?action=addreplica&collection=gettingstarted&shard=shard1&tlogReplicas=1&pullReplicas=1
----
*Output*
[source,json]
----
{
"responseHeader": {
"status": 0,
"QTime": 784
},
"success": {
"127.0.1.1:7574_solr": {
"responseHeader": {
"status": 0,
"QTime": 257
},
"core": "gettingstarted_shard1_replica_p11"
},
"127.0.1.1:8983_solr": {
"responseHeader": {
"status": 0,
"QTime": 295
},
"core": "gettingstarted_shard1_replica_t10"
}
}
}
----
[[clusterprop]] [[clusterprop]]
== CLUSTERPROP: Cluster Properties == CLUSTERPROP: Cluster Properties

View File

@ -57,7 +57,10 @@ import static org.apache.solr.common.cloud.DocCollection.RULE;
import static org.apache.solr.common.cloud.DocCollection.SNITCH; import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
@ -1648,6 +1651,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected String ulogDir; protected String ulogDir;
protected Properties properties; protected Properties properties;
protected Replica.Type type; protected Replica.Type type;
protected Integer nrtReplicas, tlogReplicas, pullReplicas;
protected String createNodeSet;
private AddReplica(String collection, String shard, String routeKey, Replica.Type type) { private AddReplica(String collection, String shard, String routeKey, Replica.Type type) {
super(CollectionAction.ADDREPLICA); super(CollectionAction.ADDREPLICA);
@ -1727,6 +1732,42 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return shard; return shard;
} }
public Integer getNrtReplicas() {
return nrtReplicas;
}
public AddReplica setNrtReplicas(Integer nrtReplicas) {
this.nrtReplicas = nrtReplicas;
return this;
}
public Integer getTlogReplicas() {
return tlogReplicas;
}
public AddReplica setTlogReplicas(Integer tlogReplicas) {
this.tlogReplicas = tlogReplicas;
return this;
}
public Integer getPullReplicas() {
return pullReplicas;
}
public AddReplica setPullReplicas(Integer pullReplicas) {
this.pullReplicas = pullReplicas;
return this;
}
public String getCreateNodeSet() {
return createNodeSet;
}
public AddReplica setCreateNodeSet(String createNodeSet) {
this.createNodeSet = createNodeSet;
return this;
}
@Override @Override
public SolrParams getParams() { public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams()); ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
@ -1759,6 +1800,18 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (properties != null) { if (properties != null) {
addProperties(params, properties); addProperties(params, properties);
} }
if (nrtReplicas != null) {
params.add(NRT_REPLICAS, String.valueOf(nrtReplicas));
}
if (tlogReplicas != null) {
params.add(TLOG_REPLICAS, String.valueOf(tlogReplicas));
}
if (pullReplicas != null) {
params.add(PULL_REPLICAS, String.valueOf(pullReplicas));
}
if (createNodeSet != null) {
params.add(CREATE_NODE_SET_PARAM, createNodeSet);
}
return params; return params;
} }