From 4bcace571ee1e512b2ca4aa3d93bc7bd522b55fe Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Fri, 21 Sep 2018 15:12:21 +0530 Subject: [PATCH 1/3] SOLR-9317: ADDREPLICA command should be able to add more than one replica to a collection,shard at a time. The API now supports 'nrtReplicas', 'tlogReplicas', 'pullReplicas' parameters as well 'createNodeSet' parameter. As part of this change, the CREATESHARD API now delegates placing replicas entirely to the ADDREPLICA command and uses the new parameters to add all the replicas in one API call. --- solr/CHANGES.txt | 5 + .../cloud/api/collections/AddReplicaCmd.java | 336 ++++++++++++------ .../solr/cloud/api/collections/Assign.java | 22 +- .../cloud/api/collections/CreateShardCmd.java | 155 +++----- .../cloud/api/collections/MoveReplicaCmd.java | 2 +- .../OverseerCollectionMessageHandler.java | 2 +- .../cloud/api/collections/ReplaceNodeCmd.java | 2 +- .../handler/admin/CollectionsHandler.java | 9 +- .../org/apache/solr/cloud/AddReplicaTest.java | 90 ++++- .../CollectionTooManyReplicasTest.java | 2 +- .../cloud/api/collections/ShardSplitTest.java | 2 +- .../sim/SimClusterStateProvider.java | 93 +++-- .../autoscaling/sim/TestSimPolicyCloud.java | 2 +- solr/solr-ref-guide/src/collections-api.adoc | 68 +++- .../solrj/request/CollectionAdminRequest.java | 53 +++ 15 files changed, 557 insertions(+), 286 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 249f68144b0..ee1d7b72b2d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -90,6 +90,11 @@ New Features error. Previously, the collapsing behavior was unreliable and undefined despite no explicit error. (Munendra S N, David Smiley) +* SOLR-9317: ADDREPLICA command should be able to add more than one replica to a collection,shard at a time. + The API now supports 'nrtReplicas', 'tlogReplicas', 'pullReplicas' parameters as well 'createNodeSet' parameter. + As part of this change, the CREATESHARD API now delegates placing replicas entirely to the ADDREPLICA command + and uses the new parameters to add all the replicas in one API call. (shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java index c9dbaec509f..f128c2e4362 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java @@ -20,14 +20,17 @@ package org.apache.solr.cloud.api.collections; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.cloud.SolrCloudManager; @@ -41,6 +44,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -54,10 +58,14 @@ import org.apache.solr.handler.component.ShardHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; @@ -79,28 +87,116 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { addReplica(state, message, results, null); } - ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) + List addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) throws IOException, InterruptedException { log.debug("addReplica() : {}", Utils.toJSONString(message)); String collectionName = message.getStr(COLLECTION_PROP); + String shard = message.getStr(SHARD_ID_PROP); + DocCollection coll = clusterState.getCollection(collectionName); + if (coll == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " does not exist"); + } + if (coll.getSlice(shard) == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Collection: " + collectionName + " shard: " + shard + " does not exist"); + } boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); final String asyncId = message.getStr(ASYNC); - AtomicReference sessionWrapper = new AtomicReference<>(); - message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper); - String node = message.getStr(CoreAdminParams.NODE); - String shard = message.getStr(SHARD_ID_PROP); - String coreName = message.getStr(CoreAdminParams.NAME); - String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME); + String createNodeSetStr = message.getStr(CREATE_NODE_SET); + + if (node != null && createNodeSetStr != null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Both 'node' and 'createNodeSet' parameters cannot be specified together."); + } + int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes - Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); boolean parallel = message.getBool("parallel", false); + Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); + EnumMap replicaTypesVsCount = new EnumMap<>(Replica.Type.class); + replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0)); + replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0)); + replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0)); + + int totalReplicas = 0; + for (Map.Entry entry : replicaTypesVsCount.entrySet()) { + totalReplicas += entry.getValue(); + } + if (totalReplicas > 1) { + if (message.getStr(CoreAdminParams.NAME) != null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified"); + } + if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified"); + } + } + + AtomicReference sessionWrapper = new AtomicReference<>(); + List createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount, sessionWrapper) + .stream() + .map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition)) + .collect(Collectors.toList()); + + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); + ZkStateReader zkStateReader = ocmh.zkStateReader; + // For tracking async calls. + Map requestMap = new HashMap<>(); + + for (CreateReplica createReplica : createReplicas) { + assert createReplica.coreName != null; + ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica); + ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap); + } + + Runnable runnable = () -> { + ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); + for (CreateReplica replica : createReplicas) { + ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName); + } + + if (sessionWrapper.get() != null) { + sessionWrapper.get().release(); + } + if (onComplete != null) onComplete.run(); + }; + + if (!parallel || waitForFinalState) { + if (waitForFinalState) { + SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh); + ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, + createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch); + try { + zkStateReader.registerCollectionStateWatcher(collectionName, watcher); + runnable.run(); + if (!latch.await(timeout, TimeUnit.SECONDS)) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active."); + } + } finally { + zkStateReader.removeCollectionStateWatcher(collectionName, watcher); + } + } else { + runnable.run(); + } + } else { + ocmh.tpe.submit(runnable); + } + + return createReplicas.stream() + .map(createReplica -> new ZkNodeProps( + ZkStateReader.COLLECTION_PROP, createReplica.collectionName, + ZkStateReader.SHARD_ID_PROP, createReplica.sliceName, + ZkStateReader.CORE_NAME_PROP, createReplica.coreName, + ZkStateReader.NODE_NAME_PROP, createReplica.node + )) + .collect(Collectors.toList()); + } + + private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException { if (coll.getStr(WITH_COLLECTION) != null) { String withCollectionName = coll.getStr(WITH_COLLECTION); DocCollection withCollection = clusterState.getCollection(withCollectionName); @@ -109,14 +205,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName(); - List replicas = withCollection.getReplicas(node); + List replicas = withCollection.getReplicas(createReplica.node); if (replicas == null || replicas.isEmpty()) { // create a replica of withCollection on the identified node before proceeding further ZkNodeProps props = new ZkNodeProps( Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), ZkStateReader.COLLECTION_PROP, withCollectionName, ZkStateReader.SHARD_ID_PROP, withCollectionShard, - "node", node, + "node", createReplica.node, CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created addReplica(clusterState, props, results, null); } @@ -130,14 +226,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ZkNodeProps props = new ZkNodeProps( Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP, collectionName, - ZkStateReader.SHARD_ID_PROP, shard, - ZkStateReader.CORE_NAME_PROP, coreName, + ZkStateReader.SHARD_ID_PROP, createReplica.sliceName, + ZkStateReader.CORE_NAME_PROP, createReplica.coreName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), - ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node), - ZkStateReader.NODE_NAME_PROP, node, - ZkStateReader.REPLICA_TYPE, replicaType.name()); - if (coreNodeName != null) { - props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); + ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node), + ZkStateReader.NODE_NAME_PROP, createReplica.node, + ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name()); + if (createReplica.coreNodeName != null) { + props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName); } try { Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); @@ -146,7 +242,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } } params.set(CoreAdminParams.CORE_NODE_NAME, - ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName()); + ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName()); } String configName = zkStateReader.readConfigName(collectionName); @@ -156,12 +252,12 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString()); - params.set(CoreAdminParams.NAME, coreName); + params.set(CoreAdminParams.NAME, createReplica.coreName); params.set(COLL_CONF, configName); params.set(CoreAdminParams.COLLECTION, collectionName); - params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name()); - if (shard != null) { - params.set(CoreAdminParams.SHARD, shard); + params.set(CoreAdminParams.REPLICA_TYPE, createReplica.replicaType.name()); + if (createReplica.sliceName != null) { + params.set(CoreAdminParams.SHARD, createReplica.sliceName); } else if (routeKey != null) { Collection slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll); if (slices.isEmpty()) { @@ -181,108 +277,34 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (instanceDir != null) { params.set(CoreAdminParams.INSTANCE_DIR, instanceDir); } - if (coreNodeName != null) { - params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName); + if (createReplica.coreNodeName != null) { + params.set(CoreAdminParams.CORE_NODE_NAME, createReplica.coreNodeName); } ocmh.addPropertyParams(message, params); - // For tracking async calls. - Map requestMap = new HashMap<>(); - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); - - ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap); - - final String fnode = node; - final String fcoreName = coreName; - - Runnable runnable = () -> { - ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); - ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName); - if (sessionWrapper.get() != null) { - sessionWrapper.get().release(); - } - if (onComplete != null) onComplete.run(); - }; - - if (!parallel || waitForFinalState) { - if (waitForFinalState) { - SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh); - ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch); - try { - zkStateReader.registerCollectionStateWatcher(collectionName, watcher); - runnable.run(); - if (!latch.await(timeout, TimeUnit.SECONDS)) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active."); - } - } finally { - zkStateReader.removeCollectionStateWatcher(collectionName, watcher); - } - } else { - runnable.run(); - } - } else { - ocmh.tpe.submit(runnable); - } - - - return new ZkNodeProps( - ZkStateReader.COLLECTION_PROP, collectionName, - ZkStateReader.SHARD_ID_PROP, shard, - ZkStateReader.CORE_NAME_PROP, coreName, - ZkStateReader.NODE_NAME_PROP, node - ); + return params; } - public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState, - ZkNodeProps message, AtomicReference sessionWrapper) throws IOException, InterruptedException { + public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState, + ZkNodeProps message, ReplicaPosition replicaPosition) { boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); String collection = message.getStr(COLLECTION_PROP); - String node = message.getStr(CoreAdminParams.NODE); + String node = replicaPosition.node; String shard = message.getStr(SHARD_ID_PROP); String coreName = message.getStr(CoreAdminParams.NAME); String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME); - Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); + Replica.Type replicaType = replicaPosition.type; + if (StringUtils.isBlank(coreName)) { coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME); } - DocCollection coll = clusterState.getCollection(collection); - if (coll == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); - } - if (coll.getSlice(shard) == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Collection: " + collection + " shard: " + shard + " does not exist"); - } - - // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place. - if (!skipCreateReplicaInClusterState) { - if (CloudUtil.usePolicyFramework(coll, cloudManager)) { - if (node == null) { - if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName()); - node = Assign.identifyNodes(cloudManager, - clusterState, - Collections.emptyList(), - collection, - message, - Collections.singletonList(shard), - replicaType == Replica.Type.NRT ? 1 : 0, - replicaType == Replica.Type.TLOG ? 1 : 0, - replicaType == Replica.Type.PULL ? 1 : 0 - ).get(0).node; - sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true)); - } - } else { - node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node, - cloudManager).get(0).nodeName;// TODO: use replica type in this logic too - } - } log.info("Node Identified {} for creating new replica of shard {}", node, shard); - if (!clusterState.liveNodesContain(node)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live"); } + DocCollection coll = clusterState.getCollection(collection); if (coreName == null) { coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType); } else if (!skipCreateReplicaInClusterState) { @@ -297,11 +319,103 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } } } - if (coreNodeName != null) { - message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName); - } - message = message.plus(CoreAdminParams.NAME, coreName); - message = message.plus(CoreAdminParams.NODE, node); - return message; + return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName); } + + public static List buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState, + String collectionName, ZkNodeProps message, + EnumMap replicaTypeVsCount, + AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException { + boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); + String sliceName = message.getStr(SHARD_ID_PROP); + DocCollection collection = clusterState.getCollection(collectionName); + + int numNrtReplicas = replicaTypeVsCount.get(Replica.Type.NRT); + int numPullReplicas = replicaTypeVsCount.get(Replica.Type.PULL); + int numTlogReplicas = replicaTypeVsCount.get(Replica.Type.TLOG); + int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas; + + String node = message.getStr(CoreAdminParams.NODE); + Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET); + if (createNodeSetStr == null) { + if (node != null) { + message.getProperties().put(OverseerCollectionMessageHandler.CREATE_NODE_SET, node); + createNodeSetStr = node; + } + } + + List positions = null; + if (!skipCreateReplicaInClusterState) { + if (CloudUtil.usePolicyFramework(collection, cloudManager)) { + if (node == null) { + if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName()); + positions = Assign.identifyNodes(cloudManager, + clusterState, + Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM), + collection.getName(), + message, + Collections.singletonList(sliceName), + numNrtReplicas, + numTlogReplicas, + numPullReplicas); + sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true)); + } + } else { + List sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas, + numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager); + int i = 0; + positions = new ArrayList<>(); + for (Map.Entry e : replicaTypeVsCount.entrySet()) { + for (int j = 0; j < e.getValue(); j++) { + positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName)); + i++; + } + } + } + } + + if (positions == null) { + assert node != null; + if (node == null) { + // in case asserts are disabled + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "A node should have been identified to add replica but wasn't. Please inform solr developers at SOLR-9317"); + } + // it is unlikely that multiple replicas have been requested to be created on + // the same node, but we've got to accommodate. + positions = new ArrayList<>(totalReplicas); + int i = 0; + for (Map.Entry entry : replicaTypeVsCount.entrySet()) { + for (int j = 0; j < entry.getValue(); j++) { + positions.add(new ReplicaPosition(sliceName, i++, entry.getKey(), node)); + } + } + } + return positions; + } + + /** + * A data structure to keep all information required to create a new replica in one place. + * Think of it as a typed ZkNodeProps for replica creation. + * + * This is not a public API and can be changed at any time without notice. + */ + public static class CreateReplica { + public final String collectionName; + public final String sliceName; + public final String node; + public final Replica.Type replicaType; + public String coreName; + public String coreNodeName; + + CreateReplica(String collectionName, String sliceName, String node, Replica.Type replicaType, String coreName, String coreNodeName) { + this.collectionName = collectionName; + this.sliceName = sliceName; + this.node = node; + this.replicaType = replicaType; + this.coreName = coreName; + this.coreNodeName = coreNodeName; + } + } + } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java index d323510826e..42de84a8628 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java @@ -273,7 +273,7 @@ public class Assign { } else { if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies"); + Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules"); } } @@ -322,11 +322,11 @@ public class Assign { // Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas // could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc. public static List getNodesForNewReplicas(ClusterState clusterState, String collectionName, - String shard, int nrtReplicas, + String shard, int nrtReplicas, int tlogReplicas, int pullReplicas, Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException, AssignmentException { - log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet ); + log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}", shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet ); DocCollection coll = clusterState.getCollection(collectionName); - Integer maxShardsPerNode = coll.getMaxShardsPerNode(); + Integer maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode(); List createNodeList = null; if (createNodeSet instanceof List) { @@ -338,15 +338,15 @@ public class Assign { HashMap nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList); if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes. - int availableSlots = 0; + long availableSlots = 0; for (Map.Entry ent : nodeNameVsShardCount.entrySet()) { //ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary. if (maxShardsPerNode > ent.getValue().thisCollectionNodes) { availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes); } } - if (availableSlots < nrtReplicas) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + if (availableSlots < nrtReplicas + tlogReplicas + pullReplicas) { + throw new AssignmentException( String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d", nrtReplicas, collectionName, maxShardsPerNode)); } @@ -355,13 +355,17 @@ public class Assign { List l = (List) coll.get(DocCollection.RULE); List replicaPositions = null; if (l != null) { + if (tlogReplicas + pullReplicas > 0) { + throw new AssignmentException(Replica.Type.TLOG + " or " + Replica.Type.PULL + + " replica types not supported with placement rules"); + } // TODO: make it so that this method doesn't require access to CC replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l); } String policyName = coll.getStr(POLICY); AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(); if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) { - replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0, + replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, tlogReplicas, pullReplicas, policyName, cloudManager, createNodeList); } @@ -461,7 +465,7 @@ public class Assign { return nodeNameVsShardCount; } DocCollection coll = clusterState.getCollection(collectionName); - Integer maxShardsPerNode = coll.getMaxShardsPerNode(); + int maxShardsPerNode = coll.getMaxShardsPerNode() == -1 ? Integer.MAX_VALUE : coll.getMaxShardsPerNode(); Map collections = clusterState.getCollectionsMap(); for (Map.Entry entry : collections.entrySet()) { DocCollection c = entry.getValue(); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java index 802583c7102..e7f35f16006 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java @@ -17,32 +17,17 @@ package org.apache.solr.cloud.api.collections; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.ImmutableMap; -import org.apache.solr.client.solrj.cloud.autoscaling.Policy; -import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; -import org.apache.solr.client.solrj.cloud.SolrCloudManager; -import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.Overseer; -import org.apache.solr.common.SolrCloseableLatch; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonAdminParams; -import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.Utils; @@ -77,114 +62,60 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd { DocCollection collection = clusterState.getCollection(collectionName); - ZkStateReader zkStateReader = ocmh.zkStateReader; - AtomicReference sessionWrapper = new AtomicReference<>(); - SolrCloseableLatch countDownLatch; - try { - List positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper); - Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message)); - // wait for a while until we see the shard - ocmh.waitForNewShard(collectionName, sliceName); - - String async = message.getStr(ASYNC); - countDownLatch = new SolrCloseableLatch(positions.size(), ocmh); - for (ReplicaPosition position : positions) { - String nodeName = position.node; - String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type); - log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName - + " on " + nodeName); - - // Need to create new params for each request - ZkNodeProps addReplicasProps = new ZkNodeProps( - COLLECTION_PROP, collectionName, - SHARD_ID_PROP, sliceName, - ZkStateReader.REPLICA_TYPE, position.type.name(), - CoreAdminParams.NODE, nodeName, - CoreAdminParams.NAME, coreName, - CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); - Map propertyParams = new HashMap<>(); - ocmh.addPropertyParams(message, propertyParams); - addReplicasProps = addReplicasProps.plus(propertyParams); - if (async != null) addReplicasProps.getProperties().put(ASYNC, async); - final NamedList addResult = new NamedList(); - ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> { - countDownLatch.countDown(); - Object addResultFailure = addResult.get("failure"); - if (addResultFailure != null) { - SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure"); - if (failure == null) { - failure = new SimpleOrderedMap(); - results.add("failure", failure); - } - failure.addAll((NamedList) addResultFailure); - } else { - SimpleOrderedMap success = (SimpleOrderedMap) results.get("success"); - if (success == null) { - success = new SimpleOrderedMap(); - results.add("success", success); - } - success.addAll((NamedList) addResult.get("success")); - } - }); - } - } finally { - if (sessionWrapper.get() != null) sessionWrapper.get().release(); - } - - log.debug("Waiting for create shard action to complete"); - countDownLatch.await(5, TimeUnit.MINUTES); - log.debug("Finished waiting for create shard action to complete"); - - log.info("Finished create command on all shards for collection: " + collectionName); - - } - - public static List buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState, - String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException { - String sliceName = message.getStr(SHARD_ID_PROP); - DocCollection collection = clusterState.getCollection(collectionName); - int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1)))); int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0)); int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0)); - int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas; if (numNrtReplicas + numTlogReplicas <= 0) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); } - Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET); + ZkStateReader zkStateReader = ocmh.zkStateReader; + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message)); + // wait for a while until we see the shard + ocmh.waitForNewShard(collectionName, sliceName); + String async = message.getStr(ASYNC); + ZkNodeProps addReplicasProps = new ZkNodeProps( + COLLECTION_PROP, collectionName, + SHARD_ID_PROP, sliceName, + ZkStateReader.NRT_REPLICAS, String.valueOf(numNrtReplicas), + ZkStateReader.TLOG_REPLICAS, String.valueOf(numTlogReplicas), + ZkStateReader.PULL_REPLICAS, String.valueOf(numPullReplicas), + OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET), + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); - boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager); - List positions; - if (usePolicyFramework) { - if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName()); - positions = Assign.identifyNodes(cloudManager, - clusterState, - Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM), - collection.getName(), - message, - Collections.singletonList(sliceName), - numNrtReplicas, - numTlogReplicas, - numPullReplicas); - sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true)); - } else { - List sortedNodeList = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas, - createNodeSetStr, cloudManager); - int i = 0; - positions = new ArrayList<>(); - for (Map.Entry e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas, - Replica.Type.TLOG, numTlogReplicas, - Replica.Type.PULL, numPullReplicas - ).entrySet()) { - for (int j = 0; j < e.getValue(); j++) { - positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName)); - i++; + Map propertyParams = new HashMap<>(); + ocmh.addPropertyParams(message, propertyParams); + addReplicasProps = addReplicasProps.plus(propertyParams); + if (async != null) addReplicasProps.getProperties().put(ASYNC, async); + final NamedList addResult = new NamedList(); + try { + ocmh.addReplica(zkStateReader.getClusterState(), addReplicasProps, addResult, () -> { + Object addResultFailure = addResult.get("failure"); + if (addResultFailure != null) { + SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure"); + if (failure == null) { + failure = new SimpleOrderedMap(); + results.add("failure", failure); + } + failure.addAll((NamedList) addResultFailure); + } else { + SimpleOrderedMap success = (SimpleOrderedMap) results.get("success"); + if (success == null) { + success = new SimpleOrderedMap(); + results.add("success", success); + } + success.addAll((NamedList) addResult.get("success")); } - } + }); + } catch (Assign.AssignmentException e) { + // clean up the slice that we created + ZkNodeProps deleteShard = new ZkNodeProps(COLLECTION_PROP, collectionName, SHARD_ID_PROP, sliceName, ASYNC, async); + new DeleteShardCmd(ocmh).call(clusterState, deleteShard, results); + throw e; } - return positions; + + log.info("Finished create command on all shards for collection: " + collectionName); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java index 2df0f771582..6071b1b337f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java @@ -268,7 +268,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd { NamedList addResult = new NamedList(); SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh); ActiveReplicaWatcher watcher = null; - ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null); + ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null).get(0); log.debug("props " + props); if (replica.equals(slice.getLeader()) || waitForFinalState) { watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index e15c3899064..a724bc78f19 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -709,7 +709,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, } } - ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) + List addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) throws Exception { return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java index d08b519a81c..a09eec37e13 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java @@ -140,7 +140,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { log.debug("Successfully created replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target); } - }); + }).get(0); if (addedReplica != null) { createdReplicas.add(addedReplica); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 3a46b2bae12..aef24487b9e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -694,6 +694,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections"); copy(req.getParams(), map, REPLICATION_FACTOR, + NRT_REPLICAS, + TLOG_REPLICAS, + PULL_REPLICAS, CREATE_NODE_SET, WAIT_FOR_FINAL_STATE); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); @@ -828,7 +831,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission DATA_DIR, ULOG_DIR, REPLICA_TYPE, - WAIT_FOR_FINAL_STATE); + WAIT_FOR_FINAL_STATE, + NRT_REPLICAS, + TLOG_REPLICAS, + PULL_REPLICAS, + CREATE_NODE_SET); return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); }), OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()), diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java index e338cc22800..342a27d2f79 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java @@ -17,7 +17,10 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; +import java.util.List; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; @@ -26,11 +29,15 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.util.LogLevel; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED; +import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED; + /** * */ @@ -45,10 +52,75 @@ public class AddReplicaTest extends SolrCloudTestCase { .configure(); } + @Before + public void setUp() throws Exception { + super.setUp(); + cluster.deleteAllCollections(); + } + + @Test + public void testAddMultipleReplicas() throws Exception { + cluster.waitForAllNodes(5); + String collection = "testAddMultipleReplicas"; + CloudSolrClient cloudClient = cluster.getSolrClient(); + + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf1", 1, 1); + create.setMaxShardsPerNode(2); + cloudClient.request(create); + + CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1") + .setNrtReplicas(1) + .setTlogReplicas(1) + .setPullReplicas(1); + RequestStatusState status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120); + assertEquals(COMPLETED, status); + DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection); + assertNotNull(docCollection); + assertEquals(4, docCollection.getReplicas().size()); + assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + + // try to add 5 more replicas which should fail because numNodes(4)*maxShardsPerNode(2)=8 and 4 replicas already exist + addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1") + .setNrtReplicas(3) + .setTlogReplicas(1) + .setPullReplicas(1); + status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120); + assertEquals(FAILED, status); + docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection); + assertNotNull(docCollection); + // sanity check that everything is as before + assertEquals(4, docCollection.getReplicas().size()); + assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + + // but adding any number of replicas is supported if an explicit create node set is specified + // so test that as well + List createNodeSet = new ArrayList<>(2); + createNodeSet.add(cluster.getRandomJetty(random()).getNodeName()); + createNodeSet.add(cluster.getRandomJetty(random()).getNodeName()); + addReplica = CollectionAdminRequest.addReplicaToShard(collection, "shard1") + .setNrtReplicas(3) + .setTlogReplicas(1) + .setPullReplicas(1) + .setCreateNodeSet(String.join(",", createNodeSet)); + status = addReplica.processAndWait(collection + "_xyz1", cloudClient, 120); + assertEquals(COMPLETED, status); + docCollection = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collection); + assertNotNull(docCollection); + // sanity check that everything is as before + assertEquals(9, docCollection.getReplicas().size()); + assertEquals(5, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals(2, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + } + @Test //commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018 public void test() throws Exception { - cluster.waitForAllNodes(5000); + cluster.waitForAllNodes(5); String collection = "addreplicatest_coll"; CloudSolrClient cloudClient = cluster.getSolrClient(); @@ -65,16 +137,16 @@ public class AddReplicaTest extends SolrCloudTestCase { addReplica.processAsync("000", cloudClient); CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); - assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); + assertNotSame(rsp.getRequestStatus(), COMPLETED); // wait for async request success boolean success = false; for (int i = 0; i < 200; i++) { rsp = requestStatus.process(cloudClient); - if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { + if (rsp.getRequestStatus() == COMPLETED) { success = true; break; } - assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED); + assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED); Thread.sleep(500); } assertTrue(success); @@ -82,23 +154,23 @@ public class AddReplicaTest extends SolrCloudTestCase { replicas2.removeAll(replicas); assertEquals(1, replicas2.size()); Replica r = replicas2.iterator().next(); - assertTrue(r.toString(), r.getState() != Replica.State.ACTIVE); + assertNotSame(r.toString(), r.getState(), Replica.State.ACTIVE); // use waitForFinalState addReplica.setWaitForFinalState(true); addReplica.processAsync("001", cloudClient); requestStatus = CollectionAdminRequest.requestStatus("001"); rsp = requestStatus.process(cloudClient); - assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); + assertNotSame(rsp.getRequestStatus(), COMPLETED); // wait for async request success success = false; for (int i = 0; i < 200; i++) { rsp = requestStatus.process(cloudClient); - if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { + if (rsp.getRequestStatus() == COMPLETED) { success = true; break; } - assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED); + assertNotSame(rsp.toString(), rsp.getRequestStatus(), RequestStatusState.FAILED); Thread.sleep(500); } assertTrue(success); @@ -114,7 +186,7 @@ public class AddReplicaTest extends SolrCloudTestCase { if (replica.getName().equals(replica2)) { continue; // may be still recovering } - assertTrue(coll.toString() + "\n" + replica.toString(), replica.getState() == Replica.State.ACTIVE); + assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE); } } } diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java index f68fa9e28c4..09a119bf327 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java @@ -158,7 +158,7 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase { assertTrue("Should have gotten the right error message back", e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of")); - // And finally, insure that there are all the replcias we expect. We should have shards 1, 2 and 4 and each + // And finally, ensure that there are all the replicas we expect. We should have shards 1, 2 and 4 and each // should have exactly two replicas waitForState("Expected shards shardstart, 1, 2 and 4, each with two active replicas", collectionName, (n, c) -> { return DocCollection.isFullyActive(n, c, 4, 2); diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java index 39ae618ab29..cd87bb5677c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java @@ -542,7 +542,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { @Test public void testSplitShardWithRule() throws Exception { - doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK); + doSplitShardWithRule(SolrIndexSplitter.SplitMethod.REWRITE); } @Test diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java index 17b56d7945b..4b7320039df 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -413,23 +414,46 @@ public class SimClusterStateProvider implements ClusterStateProvider { ClusterState clusterState = getClusterState(); DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP)); AtomicReference sessionWrapper = new AtomicReference<>(); - message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, sessionWrapper); + + Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); + EnumMap replicaTypesVsCount = new EnumMap<>(Replica.Type.class); + replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0)); + replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0)); + replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0)); + + int totalReplicas = 0; + for (Map.Entry entry : replicaTypesVsCount.entrySet()) { + totalReplicas += entry.getValue(); + } + if (totalReplicas > 1) { + if (message.getStr(CoreAdminParams.NAME) != null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified"); + } + if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified"); + } + } + + List replicaPositions = AddReplicaCmd.buildReplicaPositions(cloudManager, clusterState, coll.getName(), message, replicaTypesVsCount, sessionWrapper); + for (ReplicaPosition replicaPosition : replicaPositions) { + AddReplicaCmd.CreateReplica createReplica = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, replicaPosition); + if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) { + createReplica.coreNodeName = Assign.assignCoreNodeName(stateManager, coll); + } + ReplicaInfo ri = new ReplicaInfo( + createReplica.coreNodeName, + createReplica.coreName, + createReplica.collectionName, + createReplica.sliceName, + createReplica.replicaType, + createReplica.node, + message.getProperties() + ); + simAddReplica(ri.getNode(), ri, true); + } if (sessionWrapper.get() != null) { sessionWrapper.get().release(); } - if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) { - message = message.plus(CoreAdminParams.CORE_NODE_NAME, Assign.assignCoreNodeName(stateManager, coll)); - } - ReplicaInfo ri = new ReplicaInfo( - message.getStr(CoreAdminParams.CORE_NODE_NAME), - message.getStr(CoreAdminParams.NAME), - message.getStr(ZkStateReader.COLLECTION_PROP), - message.getStr(ZkStateReader.SHARD_ID_PROP), - Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)), - message.getStr(CoreAdminParams.NODE), - message.getProperties() - ); - simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true); results.add("success", ""); } @@ -1015,31 +1039,30 @@ public class SimClusterStateProvider implements ClusterStateProvider { .filter(e -> !e.getKey().equals("replicas")) .forEach(e -> props.put(e.getKey(), e.getValue())); // 2. create new replicas - AtomicReference sessionWrapper = new AtomicReference<>(); - List positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName, - message, sessionWrapper); - if (sessionWrapper.get() != null) { - sessionWrapper.get().release(); - } - AtomicInteger replicaNum = new AtomicInteger(1); - positions.forEach(pos -> { - Map replicaProps = new HashMap<>(); - replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard); - replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node); - replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString()); - replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(pos.node, "http")); - String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT), - replicaNum.getAndIncrement()); + EnumMap replicaTypesVsCount = new EnumMap<>(Replica.Type.class); + int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1)))); + int numTlogReplicas = message.getInt(TLOG_REPLICAS, message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0))); + int numPullReplicas = message.getInt(PULL_REPLICAS, message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0))); + replicaTypesVsCount.put(Replica.Type.NRT, numNrtReplicas); + replicaTypesVsCount.put(Replica.Type.TLOG, numTlogReplicas); + replicaTypesVsCount.put(Replica.Type.PULL, numPullReplicas); + + ZkNodeProps addReplicasProps = new ZkNodeProps( + COLLECTION_PROP, collectionName, + SHARD_ID_PROP, sliceName, + ZkStateReader.NRT_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.NRT)), + ZkStateReader.TLOG_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.TLOG)), + ZkStateReader.PULL_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.PULL)), + OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET) + ); + try { - replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName); - ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0), - coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps); - simAddReplica(pos.node, ri, false); + simAddReplica(addReplicasProps, results); } catch (Exception e) { throw new RuntimeException(e); } - }); - Map colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()); + + collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()); simRunLeaderElection(Collections.singleton(collectionName), true); results.add("success", ""); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java index c964e44ce3a..3637428302d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java @@ -236,7 +236,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase { String pullNodeName = nodes.get(1); int pullPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(pullNodeName, ImplicitSnitch.PORT); - String tlogNodeName = nodes.get(1); + String tlogNodeName = nodes.get(2); int tlogPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(tlogNodeName, ImplicitSnitch.PORT); log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName); diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc index e4230dd9b3e..d601069bb00 100644 --- a/solr/solr-ref-guide/src/collections-api.adoc +++ b/solr/solr-ref-guide/src/collections-api.adoc @@ -396,6 +396,10 @@ Use SPLITSHARD for collections created with the 'compositeId' router (`router.ke `/admin/collections?action=CREATESHARD&shard=_shardName_&collection=_name_` +The default values for `replicationFactor` or `nrtReplicas`, `tlogReplicas`, `pullReplicas` from the collection is used to determine the number of replicas to be created for the new shard. This can be customized by explicitly passing the corresponding parameters to the request. + +The API uses the Autoscaling framework to find the best possible nodes in the cluster when an Autoscaling preferences or policy is configured. Refer to <> section for more details. + === CREATESHARD Parameters `collection`:: @@ -409,6 +413,15 @@ Allows defining the nodes to spread the new collection across. If not provided, + The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`. +`nrtReplicas`:: +The number of `nrt` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted) + +`tlogReplicas`:: +The number of `tlog` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted) + +`pullReplicas`:: +The number of `pull` replicas that should be created for the new shard (optional, the defaults for the collection is used if omitted) + `property._name_=_value_`:: Set core property _name_ to _value_. See the section <> for details on supported properties and values. @@ -1016,9 +1029,9 @@ http://localhost:8983/solr/admin/collections?action=DELETEREPLICA&collection=tes [[addreplica]] == ADDREPLICA: Add Replica -Add a replica to a shard in a collection. The node name can be specified if the replica is to be created in a specific node. +Add one or more replicas to a shard in a collection. The node name can be specified if the replica is to be created in a specific node. Otherwise, a set of nodes can be specified and the most suitable ones among them will be chosen to create the replica(s). -The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica but only when an Autoscaling policy is configured. Refer to <> section for more details. +The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replica(s) but only when an Autoscaling preferences or policy is configured. Refer to <> section for more details. `/admin/collections?action=ADDREPLICA&collection=_collection_&shard=_shard_&node=_nodeName_` @@ -1038,7 +1051,14 @@ If the exact shard name is not known, users may pass the `\_route_` value and th Ignored if the `shard` parameter is also specified. `node`:: -The name of the node where the replica should be created. +The name of the node where the replica should be created (optional) + +`createNodeSet`:: +A comma-separated list of nodes among which the best ones will be chosen to place the replicas (optional) ++ +The format is a comma-separated list of node_names, such as `localhost:8983_solr,localhost:8984_solr,localhost:8985_solr`. + +If neither `node`, nor `createNodeSet` is specified then the best node(s) from among all the live nodes in the cluster are chosen. `instanceDir`:: The instanceDir for the core that will be created. @@ -1057,6 +1077,15 @@ The type of replica to create. These possible values are allowed: + See the section <> for more information about replica type options. +`nrtReplicas`:: +The number of `nrt` replicas that should be created (optional, defaults to 1 if `type` is `nrt` otherwise 0). + +`tlogReplicas`:: +The number of `tlog` replicas that should be created (optional, defaults to 1 if `type` is `tlog` otherwise 0). + +`pullReplicas`:: +The number of `pull` replicas that should be created (optional, defaults to 1 if `type` is `pull` otherwise 0). + `property._name_=_value_`:: Set core property _name_ to _value_. See <> for details about supported properties and values. @@ -1096,6 +1125,39 @@ http://localhost:8983/solr/admin/collections?action=ADDREPLICA&collection=test2& ---- +[source,text] +---- +http://localhost:8983/solr/admin/collections?action=addreplica&collection=gettingstarted&shard=shard1&tlogReplicas=1&pullReplicas=1 +---- + +*Output* + +[source,json] +---- +{ + "responseHeader": { + "status": 0, + "QTime": 784 + }, + "success": { + "127.0.1.1:7574_solr": { + "responseHeader": { + "status": 0, + "QTime": 257 + }, + "core": "gettingstarted_shard1_replica_p11" + }, + "127.0.1.1:8983_solr": { + "responseHeader": { + "status": 0, + "QTime": 295 + }, + "core": "gettingstarted_shard1_replica_t10" + } + } +} +---- + [[clusterprop]] == CLUSTERPROP: Cluster Properties diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 4cf68d49eee..0f6de19f70e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -57,7 +57,10 @@ import static org.apache.solr.common.cloud.DocCollection.RULE; import static org.apache.solr.common.cloud.DocCollection.SNITCH; import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; +import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; +import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; @@ -1648,6 +1651,8 @@ public abstract class CollectionAdminRequest protected String ulogDir; protected Properties properties; protected Replica.Type type; + protected Integer nrtReplicas, tlogReplicas, pullReplicas; + protected String createNodeSet; private AddReplica(String collection, String shard, String routeKey, Replica.Type type) { super(CollectionAction.ADDREPLICA); @@ -1727,6 +1732,42 @@ public abstract class CollectionAdminRequest return shard; } + public Integer getNrtReplicas() { + return nrtReplicas; + } + + public AddReplica setNrtReplicas(Integer nrtReplicas) { + this.nrtReplicas = nrtReplicas; + return this; + } + + public Integer getTlogReplicas() { + return tlogReplicas; + } + + public AddReplica setTlogReplicas(Integer tlogReplicas) { + this.tlogReplicas = tlogReplicas; + return this; + } + + public Integer getPullReplicas() { + return pullReplicas; + } + + public AddReplica setPullReplicas(Integer pullReplicas) { + this.pullReplicas = pullReplicas; + return this; + } + + public String getCreateNodeSet() { + return createNodeSet; + } + + public AddReplica setCreateNodeSet(String createNodeSet) { + this.createNodeSet = createNodeSet; + return this; + } + @Override public SolrParams getParams() { ModifiableSolrParams params = new ModifiableSolrParams(super.getParams()); @@ -1759,6 +1800,18 @@ public abstract class CollectionAdminRequest if (properties != null) { addProperties(params, properties); } + if (nrtReplicas != null) { + params.add(NRT_REPLICAS, String.valueOf(nrtReplicas)); + } + if (tlogReplicas != null) { + params.add(TLOG_REPLICAS, String.valueOf(tlogReplicas)); + } + if (pullReplicas != null) { + params.add(PULL_REPLICAS, String.valueOf(pullReplicas)); + } + if (createNodeSet != null) { + params.add(CREATE_NODE_SET_PARAM, createNodeSet); + } return params; } From 60569fbe4e8e9f9f9d57da9bf2570f847a26965c Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Fri, 21 Sep 2018 17:01:37 -0700 Subject: [PATCH 2/3] SOLR-11836: Move CHANGES entry to the 7.6 section --- solr/CHANGES.txt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ee1d7b72b2d..0f97dcf5387 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -101,6 +101,12 @@ Other Changes * SOLR-12762: Fix javadoc for SolrCloudTestCase.clusterShape() method and add a method that validates only against Active slices (Anshum Gupta) +Bug Fixes +---------------------- + +* SOLR-11836: FacetStream works with bucketSizeLimit of -1 which will fetch all the buckets. + (Alfonso Muñoz-Pomer Fuentes, Amrit Sarkar via Varun Thacker) + ================== 7.5.0 ================== @@ -357,9 +363,6 @@ Bug Fixes * SOLR-12733: SolrMetricReporterTest failure (Erick Erickson, David Smiley) -* SOLR-11836: FacetStream works with bucketSizeLimit of -1 which will fetch all the buckets. - (Alfonso Muñoz-Pomer Fuentes, Amrit Sarkar via Varun Thacker) - * SOLR-12765: Incorrect format of JMX cache stats. (Bojan Smid, ab) Optimizations From 4ccf0fb8f6ce269de8b4501fca201f5b4763cfe7 Mon Sep 17 00:00:00 2001 From: David Smiley Date: Fri, 21 Sep 2018 23:47:10 -0400 Subject: [PATCH 3/3] LUCENE-8511: MultiFields.getIndexedFields optimize to not call getMergedFieldInfos --- lucene/CHANGES.txt | 3 +++ .../org/apache/lucene/index/MultiFields.java | 27 +++++++------------ 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 70badd8e56e..d30575931cb 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -170,6 +170,9 @@ Optimizations * LUCENE-8448: Boolean queries now propagates the mininum score to their sub-scorers. (Jim Ferenczi, Adrien Grand) +* LUCENE-8511: MultiFields.getIndexedFields is now optimized; does not call getMergedFieldInfos + (David Smiley) + ======================= Lucene 7.6.0 ======================= Build diff --git a/lucene/core/src/java/org/apache/lucene/index/MultiFields.java b/lucene/core/src/java/org/apache/lucene/index/MultiFields.java index 19078a83c15..32ce2fabbe6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MultiFields.java +++ b/lucene/core/src/java/org/apache/lucene/index/MultiFields.java @@ -21,12 +21,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -266,7 +267,8 @@ public final class MultiFields extends Fields { public static FieldInfos getMergedFieldInfos(IndexReader reader) { final String softDeletesField = reader.leaves().stream() .map(l -> l.reader().getFieldInfos().getSoftDeletesField()) - .filter(Objects::nonNull).findAny().orElse(null); + .filter(Objects::nonNull) + .findAny().orElse(null); final FieldInfos.Builder builder = new FieldInfos.Builder(new FieldInfos.FieldNumbers(softDeletesField)); for(final LeafReaderContext ctx : reader.leaves()) { builder.add(ctx.reader().getFieldInfos()); @@ -274,22 +276,13 @@ public final class MultiFields extends Fields { return builder.finish(); } - /** Call this to get the (merged) FieldInfos representing the - * set of indexed fields only for a composite reader. - *

- * NOTE: the returned field numbers will likely not - * correspond to the actual field numbers in the underlying - * readers, and codec metadata ({@link FieldInfo#getAttribute(String)} - * will be unavailable. - */ + /** Returns a set of names of fields that have a terms index. The order is undefined. */ public static Collection getIndexedFields(IndexReader reader) { - final Collection fields = new HashSet<>(); - for(final FieldInfo fieldInfo : getMergedFieldInfos(reader)) { - if (fieldInfo.getIndexOptions() != IndexOptions.NONE) { - fields.add(fieldInfo.name); - } - } - return fields; + return reader.leaves().stream() + .flatMap(l -> StreamSupport.stream(l.reader().getFieldInfos().spliterator(), false) + .filter(fi -> fi.getIndexOptions() != IndexOptions.NONE)) + .map(fi -> fi.name) + .collect(Collectors.toSet()); } private static class LeafReaderFields extends Fields {