SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.

This commit is contained in:
Andrzej Bialecki 2019-03-19 13:41:11 +01:00
parent 83c30303ed
commit b778417054
21 changed files with 1865 additions and 18 deletions

View File

@ -64,6 +64,10 @@ New Features
* SOLR-13292: Provide extended per-segment status of a collection. (ab)
* SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections. This issue also adds
a back-compat check of the .system collection to notify users of potential compatibility issues after
upgrades or schema changes. (ab)
Bug Fixes
----------------------

View File

@ -21,15 +21,23 @@ import static org.apache.solr.common.params.CommonParams.ID;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
@ -45,11 +53,15 @@ import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
@ -524,6 +536,8 @@ public class Overseer implements SolrCloseable {
private Stats stats;
private String id;
private volatile boolean closed;
private volatile boolean systemCollCompatCheck = true;
private CloudConfig config;
// overseer not responsible for closing reader
@ -571,9 +585,129 @@ public class Overseer implements SolrCloseable {
ccThread.start();
triggerThread.start();
systemCollectionCompatCheck(new BiConsumer<String, Object>() {
boolean firstPair = true;
@Override
public void accept(String s, Object o) {
if (firstPair) {
log.warn("WARNING: Collection '.system' may need re-indexing due to compatibility issues listed below. See REINDEXCOLLECTION documentation for more details.");
firstPair = false;
}
log.warn("WARNING: *\t{}:\t{}", s, o);
}
});
assert ObjectReleaseTracker.track(this);
}
public void systemCollectionCompatCheck(final BiConsumer<String, Object> consumer) {
ClusterState clusterState = zkController.getClusterState();
if (clusterState == null) {
log.warn("Unable to check back-compat of .system collection - can't obtain ClusterState.");
return;
}
DocCollection coll = clusterState.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
if (coll == null) {
return;
}
// check that all shard leaders are active
boolean allActive = true;
for (Slice s : coll.getActiveSlices()) {
if (s.getLeader() == null || !s.getLeader().isActive(clusterState.getLiveNodes())) {
allActive = false;
break;
}
}
if (allActive) {
doCompatCheck(consumer);
} else {
// wait for all leaders to become active and then check
zkController.zkStateReader.registerCollectionStateWatcher(CollectionAdminParams.SYSTEM_COLL, (liveNodes, state) -> {
boolean active = true;
if (state == null || liveNodes.isEmpty()) {
return true;
}
for (Slice s : state.getActiveSlices()) {
if (s.getLeader() == null || !s.getLeader().isActive(liveNodes)) {
active = false;
break;
}
}
if (active) {
doCompatCheck(consumer);
}
return active;
});
}
}
private void doCompatCheck(BiConsumer<String, Object> consumer) {
if (systemCollCompatCheck) {
systemCollCompatCheck = false;
} else {
return;
}
try (CloudSolrClient client = new CloudSolrClient.Builder(Collections.singletonList(getZkController().getZkServerAddress()), Optional.empty())
.withSocketTimeout(30000).withConnectionTimeout(15000)
.withHttpClient(updateShardHandler.getDefaultHttpClient()).build()) {
CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(CollectionAdminParams.SYSTEM_COLL)
.setWithSegments(true)
.setWithFieldInfo(true);
CollectionAdminResponse rsp = req.process(client);
NamedList<Object> status = (NamedList<Object>)rsp.getResponse().get(CollectionAdminParams.SYSTEM_COLL);
Collection<String> nonCompliant = (Collection<String>)status.get("schemaNonCompliant");
if (!nonCompliant.contains("(NONE)")) {
consumer.accept("indexFieldsNotMatchingSchema", nonCompliant);
}
Set<Integer> segmentCreatedMajorVersions = new HashSet<>();
Set<String> segmentVersions = new HashSet<>();
int currentMajorVersion = Version.LATEST.major;
String currentVersion = Version.LATEST.toString();
segmentVersions.add(currentVersion);
segmentCreatedMajorVersions.add(currentMajorVersion);
NamedList<Object> shards = (NamedList<Object>)status.get("shards");
for (Map.Entry<String, Object> entry : shards) {
NamedList<Object> leader = (NamedList<Object>)((NamedList<Object>)entry.getValue()).get("leader");
if (leader == null) {
continue;
}
NamedList<Object> segInfos = (NamedList<Object>)leader.get("segInfos");
if (segInfos == null) {
continue;
}
NamedList<Object> infos = (NamedList<Object>)segInfos.get("info");
if (((Number)infos.get("numSegments")).intValue() > 0) {
segmentVersions.add(infos.get("minSegmentLuceneVersion").toString());
}
if (infos.get("commitLuceneVersion") != null) {
segmentVersions.add(infos.get("commitLuceneVersion").toString());
}
NamedList<Object> segmentInfos = (NamedList<Object>)segInfos.get("segments");
segmentInfos.forEach((k, v) -> {
NamedList<Object> segment = (NamedList<Object>)v;
segmentVersions.add(segment.get("version").toString());
if (segment.get("minVersion") != null) {
segmentVersions.add(segment.get("version").toString());
}
if (segment.get("createdVersionMajor") != null) {
segmentCreatedMajorVersions.add(((Number)segment.get("createdVersionMajor")).intValue());
}
});
}
if (segmentVersions.size() > 1) {
consumer.accept("differentSegmentVersions", segmentVersions);
consumer.accept("currentLuceneVersion", currentVersion);
}
if (segmentCreatedMajorVersions.size() > 1) {
consumer.accept("differentMajorSegmentVersions", segmentCreatedMajorVersions);
consumer.accept("currentLuceneMajorVersion", currentMajorVersion);
}
} catch (SolrServerException | IOException e) {
log.warn("Unable to perform back-compat check of .system collection", e);
}
}
public Stats getStats() {
return stats;
}

View File

@ -181,7 +181,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
private String referencedByAlias(String collection, Aliases aliases) {
public static String referencedByAlias(String collection, Aliases aliases) {
Objects.requireNonNull(aliases);
return aliases.getCollectionAliasListMap().entrySet().stream()
.filter(e -> e.getValue().contains(collection))

View File

@ -241,6 +241,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
.put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
.put(UTILIZENODE, new UtilizeNodeCmd(this))
.build()
;

View File

@ -0,0 +1,824 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reindex a collection, usually in order to change the index schema.
* <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available as
* stored fields may be irretrievably lost, so users should use this command with caution, evaluating
* the potential impact by using different source and target collection names first, and preserving
* the source collection until the evaluation is complete.</p>
* <p>Reindexing follows these steps:</p>
* <ol>
* <li>creates a temporary collection using the most recent schema of the source collection
* (or the one specified in the parameters, which must already exist), and the shape of the original
* collection, unless overridden by parameters.</li>
* <li>copy the source documents to the temporary collection, using their stored fields and
* reindexing them using the specified schema. NOTE: some data
* loss may occur if the original stored field data is not available!</li>
* <li>create the target collection from scratch with the specified name (or the same as source if not
* specified) and the specified parameters. NOTE: if the target name was not specified or is the same
* as the source collection then a unique sequential collection name will be used.</li>
* <li>copy the documents from the source collection to the target collection.</li>
* <li>if the source and target collection name was the same then set up an alias pointing from the source collection name to the actual
* (sequentially named) target collection</li>
* <li>optionally delete the source collection.</li>
* </ol>
*/
public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String COMMAND = "cmd";
public static final String REINDEX_STATUS = "reindexStatus";
public static final String REMOVE_SOURCE = "removeSource";
public static final String TARGET = "target";
public static final String TARGET_COL_PREFIX = ".rx_";
public static final String CHK_COL_PREFIX = ".rx_ck_";
public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
public static final String STATE = "state";
public static final String PHASE = "phase";
private static final List<String> COLLECTION_PARAMS = Arrays.asList(
ZkStateReader.CONFIGNAME_PROP,
ZkStateReader.NUM_SHARDS_PROP,
ZkStateReader.NRT_REPLICAS,
ZkStateReader.PULL_REPLICAS,
ZkStateReader.TLOG_REPLICAS,
ZkStateReader.REPLICATION_FACTOR,
ZkStateReader.MAX_SHARDS_PER_NODE,
"shards",
Policy.POLICY,
CollectionAdminParams.CREATE_NODE_SET_PARAM,
CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM,
ZkStateReader.AUTO_ADD_REPLICAS
);
private final OverseerCollectionMessageHandler ocmh;
private static AtomicInteger tmpCollectionSeq = new AtomicInteger();
public enum State {
IDLE,
RUNNING,
ABORTED,
FINISHED;
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
public static State get(Object p) {
if (p == null) {
return null;
}
p = String.valueOf(p).toLowerCase(Locale.ROOT);
return states.get(p);
}
static Map<String, State> states = Collections.unmodifiableMap(
Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
}
public enum Cmd {
START,
ABORT,
STATUS;
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
public static Cmd get(String p) {
if (p == null) {
return null;
}
p = p.toLowerCase(Locale.ROOT);
return cmds.get(p);
}
static Map<String, Cmd> cmds = Collections.unmodifiableMap(
Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
}
private SolrClientCache solrClientCache;
private String zkHost;
public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@Override
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
log.debug("*** called: {}", message);
String collection = message.getStr(CommonParams.NAME);
// before resolving aliases
String originalCollection = collection;
Aliases aliases = ocmh.zkStateReader.getAliases();
if (collection != null) {
// resolve aliases - the source may be an alias
List<String> aliasList = aliases.resolveAliases(collection);
if (aliasList != null && !aliasList.isEmpty()) {
collection = aliasList.get(0);
}
}
if (collection == null || !clusterState.hasCollection(collection)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified and must exist");
}
String target = message.getStr(TARGET);
if (target == null) {
target = collection;
} else {
// resolve aliases
List<String> aliasList = aliases.resolveAliases(target);
if (aliasList != null && !aliasList.isEmpty()) {
target = aliasList.get(0);
}
}
boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
boolean removeSource = message.getBool(REMOVE_SOURCE, false);
Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
if (command == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
}
Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
if (!reindexingState.containsKey(STATE)) {
reindexingState.put(STATE, State.IDLE.toLower());
}
State state = State.get(reindexingState.get(STATE));
if (command == Cmd.ABORT) {
log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
// check that it's running
if (state != State.RUNNING) {
log.debug("Abort requested for collection {} but command is not running: {}", collection, state);
return;
}
setReindexingState(collection, State.ABORTED, null);
reindexingState.put(STATE, "aborting");
results.add(REINDEX_STATUS, reindexingState);
// if needed the cleanup will be performed by the running instance of the command
return;
} else if (command == Cmd.STATUS) {
results.add(REINDEX_STATUS, reindexingState);
return;
}
// command == Cmd.START
// check it's not already running
if (state == State.RUNNING) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
}
DocCollection coll = clusterState.getCollection(collection);
boolean aborted = false;
int batchSize = message.getInt(CommonParams.ROWS, 100);
String query = message.getStr(CommonParams.Q, "*:*");
String fl = message.getStr(CommonParams.FL, "*");
Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
Integer numPull = message.getInt(ZkStateReader.PULL_REPLICAS, coll.getNumPullReplicas());
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, coll.getMaxShardsPerNode());
DocRouter router = coll.getRouter();
if (router == null) {
router = DocRouter.DEFAULT;
}
String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
String targetCollection;
int seq = tmpCollectionSeq.getAndIncrement();
if (sameTarget) {
do {
targetCollection = TARGET_COL_PREFIX + originalCollection + "_" + seq;
if (!clusterState.hasCollection(targetCollection)) {
break;
}
seq = tmpCollectionSeq.getAndIncrement();
} while (clusterState.hasCollection(targetCollection));
} else {
targetCollection = target;
}
String chkCollection = CHK_COL_PREFIX + originalCollection;
String daemonUrl = null;
Exception exc = null;
boolean createdTarget = false;
try {
solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
// set the running flag
reindexingState.clear();
reindexingState.put("actualSourceCollection", collection);
reindexingState.put("actualTargetCollection", targetCollection);
reindexingState.put("checkpointCollection", chkCollection);
reindexingState.put("inputDocs", getNumberOfDocs(collection));
reindexingState.put(PHASE, "creating target and checkpoint collections");
setReindexingState(collection, State.RUNNING, reindexingState);
// 0. set up target and checkpoint collections
NamedList<Object> cmdResults = new NamedList<>();
ZkNodeProps cmd;
if (clusterState.hasCollection(targetCollection)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Target collection " + targetCollection + " already exists! Delete it first.");
}
if (clusterState.hasCollection(chkCollection)) {
// delete the checkpoint collection
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, chkCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
checkResults("deleting old checkpoint collection " + chkCollection, cmdResults, true);
}
if (maybeAbort(collection)) {
aborted = true;
return;
}
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
propMap.put(CommonParams.NAME, targetCollection);
propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
propMap.put(CollectionAdminParams.COLL_CONF, configName);
// init first from the same router
propMap.put("router.name", router.getName());
for (String key : coll.keySet()) {
if (key.startsWith("router.")) {
propMap.put(key, coll.get(key));
}
}
// then apply overrides if present
for (String key : message.keySet()) {
if (key.startsWith("router.")) {
propMap.put(key, message.getStr(key));
} else if (COLLECTION_PARAMS.contains(key)) {
propMap.put(key, message.get(key));
}
}
propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
if (rf != null) {
propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
}
if (numNrt != null) {
propMap.put(ZkStateReader.NRT_REPLICAS, numNrt);
}
if (numTlog != null) {
propMap.put(ZkStateReader.TLOG_REPLICAS, numTlog);
}
if (numPull != null) {
propMap.put(ZkStateReader.PULL_REPLICAS, numPull);
}
// create the target collection
cmd = new ZkNodeProps(propMap);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
createdTarget = true;
checkResults("creating target collection " + targetCollection, cmdResults, true);
// create the checkpoint collection - use RF=1 and 1 shard
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
CommonParams.NAME, chkCollection,
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.REPLICATION_FACTOR, "1",
DocCollection.STATE_FORMAT, "2",
CollectionAdminParams.COLL_CONF, "_default",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
checkResults("creating checkpoint collection " + chkCollection, cmdResults, true);
// wait for a while until we see both collections
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
boolean created = false;
while (!waitUntil.hasTimedOut()) {
waitUntil.sleep(100);
// this also refreshes our local var clusterState
clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
created = clusterState.hasCollection(targetCollection) && clusterState.hasCollection(chkCollection);
if (created) break;
}
if (!created) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
}
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 1. put the source collection in read-only mode
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, "true");
ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
TestInjection.injectReindexLatch();
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 2. copy the documents to target
// Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
q.set("collection", collection);
q.set("expr",
"daemon(id=\"" + targetCollection + "\"," +
"terminate=\"true\"," +
"commit(" + targetCollection + "," +
"update(" + targetCollection + "," +
"batchSize=" + batchSize + "," +
"topic(" + chkCollection + "," +
collection + "," +
"q=\"" + query + "\"," +
"fl=\"" + fl + "\"," +
"id=\"topic_" + targetCollection + "\"," +
// some of the documents eg. in .system contain large blobs
"rows=\"" + batchSize + "\"," +
"initialCheckpoint=\"0\"))))");
log.debug("- starting copying documents from " + collection + " to " + targetCollection);
SolrResponse rsp = null;
try {
rsp = ocmh.cloudManager.request(new QueryRequest(q));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
collection + " to " + targetCollection, e);
}
daemonUrl = getDaemonUrl(rsp, coll);
if (daemonUrl == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
}
reindexingState.put("daemonUrl", daemonUrl);
reindexingState.put("daemonName", targetCollection);
reindexingState.put(PHASE, "copying documents");
setReindexingState(collection, State.RUNNING, reindexingState);
// wait for the daemon to finish
waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
}
log.debug("- finished copying from " + collection + " to " + targetCollection);
// fail here or earlier during daemon run
TestInjection.injectReindexFailure();
// 5. if (sameTarget) set up an alias to use targetCollection as the source name
if (sameTarget) {
log.debug("- setting up alias from " + originalCollection + " to " + targetCollection);
cmd = new ZkNodeProps(
CommonParams.NAME, originalCollection,
"collections", targetCollection);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
reindexingState.put("alias", originalCollection + " -> " + targetCollection);
}
reindexingState.remove("daemonUrl");
reindexingState.remove("daemonName");
reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
reindexingState.put(PHASE, "copying done, finalizing");
setReindexingState(collection, State.RUNNING, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 6. delete the checkpoint collection
log.debug("- deleting " + chkCollection);
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, chkCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
checkResults("deleting checkpoint collection " + chkCollection, cmdResults, true);
// 7. optionally delete the source collection
if (removeSource) {
log.debug("- deleting source collection");
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, collection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
checkResults("deleting source collection " + collection, cmdResults, true);
} else {
// 8. clear readOnly on source
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, null);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
// 9. set FINISHED state on the target and clear the state on the source
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, targetCollection,
REINDEXING_STATE, State.FINISHED.toLower());
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
reindexingState.put(STATE, State.FINISHED.toLower());
reindexingState.put(PHASE, "done");
removeReindexingState(collection);
} catch (Exception e) {
log.warn("Error during reindexing of " + originalCollection, e);
exc = e;
aborted = true;
} finally {
solrClientCache.close();
if (aborted) {
cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
if (exc != null) {
results.add("error", exc.toString());
}
reindexingState.put(STATE, State.ABORTED.toLower());
}
results.add(REINDEX_STATUS, reindexingState);
}
}
private static final String REINDEXING_STATE_PATH = "/.reindexing";
private Map<String, Object> setReindexingState(String collection, State state, Map<String, Object> props) throws Exception {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
Map<String, Object> copyProps = new HashMap<>();
if (props == null) { // retrieve existing props, if any
props = Utils.getJson(stateManager, path);
}
copyProps.putAll(props);
copyProps.put("state", state.toLower());
if (stateManager.hasData(path)) {
stateManager.setData(path, Utils.toJSON(copyProps), -1);
} else {
stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
}
return copyProps;
}
private void removeReindexingState(String collection) throws Exception {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
if (stateManager.hasData(path)) {
stateManager.removeData(path, -1);
}
}
@VisibleForTesting
public static Map<String, Object> getReindexingState(DistribStateManager stateManager, String collection) throws Exception {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
// make it modifiable
return new TreeMap<>(Utils.getJson(stateManager, path));
}
private long getNumberOfDocs(String collection) {
CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, "*:*");
params.add(CommonParams.ROWS, "0");
QueryResponse rsp = solrClient.query(collection, params);
return rsp.getResults().getNumFound();
} catch (Exception e) {
return 0L;
}
}
private void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws Exception {
Object failure = results.get("failure");
if (failure == null) {
failure = results.get("error");
}
if (failure != null) {
String msg = "Error: " + label + ": " + Utils.toJSONString(results);
if (failureIsFatal) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
} else {
log.error(msg);
}
}
}
private boolean maybeAbort(String collection) throws Exception {
DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
if (coll == null) {
// collection no longer present - abort
log.info("## Aborting - collection {} no longer present.", collection);
return true;
}
Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
if (state != State.ABORTED) {
return false;
}
log.info("## Aborting - collection {} state is {}", collection, state);
return true;
}
// XXX see #waitForDaemon() for why we need this
private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
if (rs == null || rs.isEmpty()) {
log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
}
List<Object> list = (List<Object>)rs.get("docs");
if (list == null) {
log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
return null;
}
String replicaName = null;
for (Object o : list) {
Map<String, Object> map = (Map<String, Object>)o;
String op = (String)map.get("DaemonOp");
if (op == null) {
continue;
}
String[] parts = op.split("\\s+");
if (parts.length != 4) {
log.debug(" -- Invalid daemon location info, expected 4 tokens: " + op);
return null;
}
// check if it's plausible
if (parts[3].contains("shard") && parts[3].contains("replica")) {
replicaName = parts[3];
break;
} else {
log.debug(" -- daemon location info likely invalid: " + op);
return null;
}
}
if (replicaName == null) {
return null;
}
// build a baseUrl of the replica
for (Replica r : coll.getReplicas()) {
if (replicaName.equals(r.getCoreName())) {
return r.getBaseUrl() + "/" + r.getCoreName();
}
}
return null;
}
// XXX currently this is complicated to due a bug in the way the daemon 'list'
// XXX operation is implemented - see SOLR-13245. We need to query the actual
// XXX SolrCore where the daemon is running
private void waitForDaemon(String daemonName, String daemonUrl, String sourceCollection, String targetCollection, Map<String, Object> reindexingState) throws Exception {
HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
.withHttpClient(client)
.withBaseSolrUrl(daemonUrl).build()) {
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
q.set("action", "list");
q.set(CommonParams.DISTRIB, false);
QueryRequest req = new QueryRequest(q);
boolean isRunning;
int statusCheck = 0;
do {
isRunning = false;
statusCheck++;
try {
NamedList<Object> rsp = solrClient.request(req);
Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
if (rs == null || rs.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
}
List<Object> list = (List<Object>)rs.get("docs");
if (list == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
}
if (list.isEmpty()) { // finished?
break;
}
for (Object o : list) {
Map<String, Object> map = (Map<String, Object>)o;
String id = (String)map.get("id");
if (daemonName.equals(id)) {
isRunning = true;
// fail here
TestInjection.injectReindexFailure();
break;
}
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception waiting for daemon " +
daemonName + " at " + daemonUrl, e);
}
if (statusCheck % 5 == 0) {
reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
setReindexingState(sourceCollection, State.RUNNING, reindexingState);
}
ocmh.cloudManager.getTimeSource().sleep(2000);
} while (isRunning && !maybeAbort(sourceCollection));
}
}
private void killDaemon(String daemonName, String daemonUrl) throws Exception {
log.debug("-- killing daemon " + daemonName + " at " + daemonUrl);
HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
.withHttpClient(client)
.withBaseSolrUrl(daemonUrl).build()) {
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
// we should really use 'kill' here, but then we will never
// know when the daemon actually finishes running - 'kill' only
// sets a flag that may be noticed much later
q.set("action", "stop");
q.set(CommonParams.ID, daemonName);
q.set(CommonParams.DISTRIB, false);
QueryRequest req = new QueryRequest(q);
NamedList<Object> rsp = solrClient.request(req);
// /result-set/docs/[0]/DaemonOp : Deamon:id killed on coreName
log.debug(" -- stop daemon response: " + Utils.toJSONString(rsp));
Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
if (rs == null || rs.isEmpty()) {
log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
return;
}
List<Object> list = (List<Object>) rs.get("docs");
if (list == null) {
log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
return;
}
if (list.isEmpty()) { // already finished?
return;
}
for (Object o : list) {
Map<String, Object> map = (Map<String, Object>) o;
String op = (String) map.get("DaemonOp");
if (op == null) {
continue;
}
if (op.contains(daemonName) && op.contains("stopped")) {
// now wait for the daemon to really stop
q.set("action", "list");
req = new QueryRequest(q);
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, ocmh.timeSource);
while (!timeOut.hasTimedOut()) {
rsp = solrClient.request(req);
rs = (Map<String, Object>) rsp.get("result-set");
if (rs == null || rs.isEmpty()) {
log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
break;
}
List<Object> list2 = (List<Object>) rs.get("docs");
if (list2 == null) {
log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
break;
}
if (list2.isEmpty()) { // already finished?
break;
}
Map<String, Object> status2 = null;
for (Object o2 : list2) {
Map<String, Object> map2 = (Map<String, Object>)o2;
if (daemonName.equals(map2.get("id"))) {
status2 = map2;
break;
}
}
if (status2 == null) { // finished?
break;
}
Number stopTime = (Number)status2.get("stopTime");
if (stopTime.longValue() > 0) {
break;
}
}
if (timeOut.hasTimedOut()) {
log.warn("Problem killing daemon " + daemonName + ": timed out waiting for daemon to stop.");
// proceed anyway
}
}
}
// now kill it - it's already stopped, this simply removes its status
q.set("action", "kill");
req = new QueryRequest(q);
solrClient.request(req);
}
}
private void cleanup(String collection, String targetCollection, String chkCollection,
String daemonUrl, String daemonName, boolean createdTarget) throws Exception {
log.info("## Cleaning up after abort or error");
// 1. kill the daemon
// 2. cleanup target / chk collections IFF the source collection still exists and is not empty
// 3. cleanup collection state
if (daemonUrl != null) {
killDaemon(daemonName, daemonUrl);
}
ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
NamedList<Object> cmdResults = new NamedList<>();
if (createdTarget && !collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
log.debug(" -- removing " + targetCollection);
ZkNodeProps cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, targetCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
checkResults("CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
}
// remove chk collection
if (clusterState.hasCollection(chkCollection)) {
log.debug(" -- removing " + chkCollection);
ZkNodeProps cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, chkCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
checkResults("CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
}
log.debug(" -- turning readOnly mode off for " + collection);
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, null);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
removeReindexingState(collection);
}
}

View File

@ -216,8 +216,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
DaemonStream d = daemons.remove(id);
if (d != null) {
d.close();
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
} else {
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
}
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
}
}
}

View File

@ -149,6 +149,9 @@ public class ColStatus {
sliceMap.add("leader", leaderMap);
leaderMap.add("coreNode", leader.getName());
leaderMap.addAll(leader.getProperties());
if (!leader.isActive(clusterState.getLiveNodes())) {
continue;
}
String url = ZkCoreNodeProps.getCoreUrl(leader);
try (SolrClient client = solrClientCache.getHttpSolrClient(url)) {
ModifiableSolrParams params = new ModifiableSolrParams();

View File

@ -53,6 +53,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@ -77,6 +78,7 @@ import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -540,6 +542,35 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
Map<String, Object> m = copy(req.getParams().required(), null, NAME);
copy(req.getParams(), m,
ReindexCollectionCmd.COMMAND,
ReindexCollectionCmd.REMOVE_SOURCE,
ReindexCollectionCmd.TARGET,
ZkStateReader.CONFIGNAME_PROP,
NUM_SLICES,
NRT_REPLICAS,
PULL_REPLICAS,
TLOG_REPLICAS,
REPLICATION_FACTOR,
MAX_SHARDS_PER_NODE,
POLICY,
CREATE_NODE_SET,
CREATE_NODE_SET_SHUFFLE,
AUTO_ADD_REPLICAS,
"shards",
STATE_FORMAT,
CommonParams.ROWS,
CommonParams.Q,
CommonParams.FL);
if (req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP) != null) {
m.put(ZkStateReader.CONFIGNAME_PROP, req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP));
}
copyPropertiesWithPrefix(req.getParams(), m, "router.");
return m;
}),
SYNCSHARD_OP(SYNCSHARD, (req, rsp, h) -> {
String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");

View File

@ -346,7 +346,13 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
zkCmdExecutor.ensureExists(upgradedSchemaPath, zkController.getZkClient());
zkController.getZkClient().setData(upgradedSchemaPath, bytes, true);
// Then delete the non-managed schema znode
zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
if (zkController.getZkClient().exists(nonManagedSchemaPath, true)) {
try {
zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
} catch (KeeperException.NoNodeException ex) {
// ignore - someone beat us to it
}
}
// Set the resource name to the managed schema so that the CoreAdminHandler returns a findable filename
schema.setResourceName(managedSchemaResourceName);

View File

@ -126,6 +126,10 @@ public class TestInjection {
public volatile static CountDownLatch splitLatch = null;
public volatile static CountDownLatch reindexLatch = null;
public volatile static String reindexFailure = null;
public volatile static String failIndexFingerprintRequests = null;
public volatile static String wrongIndexFingerprint = null;
@ -156,6 +160,8 @@ public class TestInjection {
splitFailureBeforeReplicaCreation = null;
splitFailureAfterReplicaCreation = null;
splitLatch = null;
reindexLatch = null;
reindexFailure = null;
prepRecoveryOpPauseForever = null;
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
failIndexFingerprintRequests = null;
@ -423,6 +429,35 @@ public class TestInjection {
return true;
}
public static boolean injectReindexFailure() {
if (reindexFailure != null) {
Random rand = random();
if (null == rand) return true;
Pair<Boolean,Integer> pair = parseValue(reindexFailure);
boolean enabled = pair.first();
int chanceIn100 = pair.second();
if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
log.info("Test injection failure");
throw new SolrException(ErrorCode.SERVER_ERROR, "Test injection failure");
}
}
return true;
}
public static boolean injectReindexLatch() {
if (reindexLatch != null) {
try {
log.info("Waiting in ReindexCollectionCmd for up to 60s");
return reindexLatch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return true;
}
private static Pair<Boolean,Integer> parseValue(final String raw) {
if (raw == null) return new Pair<>(false, 0);
Matcher m = ENABLED_PERCENT.matcher(raw);

View File

@ -0,0 +1,379 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*
*/
@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
public class ReindexCollectionTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
// only *_s
.addConfig("conf1", configset("cloud-minimal"))
// every combination of field flags
.addConfig("conf2", configset("cloud-dynamic"))
// catch-all * field, indexed+stored
.addConfig("conf3", configset("cloud-minimal-inplace-updates"))
.configure();
}
private CloudSolrClient solrClient;
private SolrCloudManager cloudManager;
private DistribStateManager stateManager;
@Before
public void doBefore() throws Exception {
ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
cloudManager = zkController.getSolrCloudManager();
stateManager = cloudManager.getDistribStateManager();
solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
Optional.empty()).build();
}
private ReindexCollectionCmd.State getState(String collection) {
try {
return ReindexCollectionCmd.State.get(ReindexCollectionCmd
.getReindexingState(stateManager, collection)
.get(ReindexCollectionCmd.STATE));
} catch (Exception e) {
fail("Unexpected exception checking state of " + collection + ": " + e);
return null;
}
}
private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
ReindexCollectionCmd.State current = null;
while (!timeOut.hasTimedOut()) {
current = getState(collection);
if (expected == current) {
return;
}
timeOut.sleep(500);
}
throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
}
@After
public void doAfter() throws Exception {
cluster.deleteAllCollections(); // deletes aliases too
solrClient.close();
TestInjection.reset();
}
private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
@Test
public void testBasicReindexing() throws Exception {
final String sourceCollection = "basicReindexing";
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
final String targetCollection = "basicReindexingTarget";
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
CollectionAdminResponse rsp = req.process(solrClient);
assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
}
public void testSameTargetReindexing() throws Exception {
final String sourceCollection = "sameTargetReindexing";
final String targetCollection = sourceCollection;
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
req.process(solrClient);
String realTargetCollection = null;
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
while (!timeOut.hasTimedOut()) {
timeOut.sleep(500);
for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
if (name.startsWith(prefix)) {
realTargetCollection = name;
break;
}
}
if (realTargetCollection != null) {
break;
}
}
assertNotNull("target collection not present after 30s", realTargetCollection);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
}
@Test
public void testLossySchema() throws Exception {
final String sourceCollection = "sourceLossyReindexing";
final String targetCollection = "targetLossyReindexing";
createCollection(sourceCollection, "conf2", 2, 2);
indexDocs(sourceCollection, NUM_DOCS, i ->
new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i),
"sind", "this is a test " + i)); // "sind": indexed=true, stored=false, will be lost...
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection)
.setConfigName("conf3");
req.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
for (SolrDocument doc : rsp.getResults()) {
String id = (String)doc.getFieldValue("id");
assertEquals(id, doc.getFieldValue("string_s"));
assertFalse(doc.containsKey("sind")); // lost in translation ...
}
}
@Test
public void testReshapeReindexing() throws Exception {
final String sourceCollection = "reshapeReindexing";
final String targetCollection = "reshapeReindexingTarget";
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i),
"remove_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection)
.setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
.setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
.setCollectionParam("router.name", ImplicitDocRouter.NAME)
.setCollectionParam("shards", "foo,bar,baz")
.setCollectionParam("fl", "id,string_s")
.setCollectionParam("q", "id:10*");
req.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
// 10 and 100-109
assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
// verify the correct fields exist
for (SolrDocument doc : rsp.getResults()) {
assertNotNull(doc.getFieldValue("id"));
assertNotNull(doc.getFieldValue("string_s"));
assertNull(doc.getFieldValue("remove_s"));
}
// check the shape of the new collection
ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
List<String> aliases = solrClient.getZkStateReader().getAliases().resolveAliases(targetCollection);
assertFalse(aliases.isEmpty());
String realTargetCollection = aliases.get(0);
DocCollection coll = clusterState.getCollection(realTargetCollection);
assertNotNull(coll);
assertEquals(3, coll.getSlices().size());
assertNotNull("foo", coll.getSlice("foo"));
assertNotNull("bar", coll.getSlice("bar"));
assertNotNull("baz", coll.getSlice("baz"));
assertEquals(Integer.valueOf(1), coll.getReplicationFactor());
assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
}
@Test
public void testFailure() throws Exception {
final String sourceCollection = "failReindexing";
final String targetCollection = "failReindexingTarget";
final String aliasTarget = "failAlias";
createCollection(sourceCollection, "conf1", 2, 2);
createCollection(targetCollection, "conf1", 1, 1);
CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
CollectionAdminResponse rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(aliasTarget);
rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
TestInjection.reindexFailure = "true:100";
rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
// verify that the target and checkpoint collections don't exist
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
});
// verify that the source collection is read-write and has no reindexing flags
CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
((liveNodes, collectionState) ->
!collectionState.isReadOnly() &&
collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
getState(sourceCollection) == null));
}
@Test
public void testAbort() throws Exception {
final String sourceCollection = "abortReindexing";
final String targetCollection = "abortReindexingTarget";
createCollection(sourceCollection, "conf1", 2, 1);
TestInjection.reindexLatch = new CountDownLatch(1);
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
String asyncId = req.processAsync(solrClient);
// wait for the source collection to be put in readOnly mode
CloudTestUtils.waitForState(cloudManager, "source collection didn't become readOnly",
sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
req = CollectionAdminRequest.reindexCollection(sourceCollection);
req.setCommand("abort");
CollectionAdminResponse rsp = req.process(solrClient);
Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborting", status.get("state"));
CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
((liveNodes, collectionState) ->
collectionState.isReadOnly() &&
getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
// verify status
req.setCommand("status");
rsp = req.process(solrClient);
status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborted", status.get("state"));
// let the process continue
TestInjection.reindexLatch.countDown();
CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
// verify the response
rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborted", status.get("state"));
}
private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
.setMaxShardsPerNode(-1)
.process(solrClient);
cluster.waitForActiveCollection(name, numShards, numShards * numReplicas);
}
private void indexDocs(String collection, int numDocs, Function<Integer, SolrInputDocument> generator) throws Exception {
List<SolrInputDocument> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(generator.apply(i));
}
solrClient.add(collection, docs);
solrClient.commit(collection);
// verify the docs exist
QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q, "*:*"));
assertEquals("num docs", NUM_DOCS, rsp.getResults().getNumFound());
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.logging.LogWatcher;
import org.apache.solr.logging.LogWatcherConfig;
import org.apache.solr.util.IdUtils;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class SystemCollectionCompatTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("managed.schema.mutable", "true");
configureCluster(2)
.addConfig("conf1", configset("cloud-managed"))
.configure();
if (! log.isWarnEnabled()) {
fail("Test requires that log-level is at-least WARN, but WARN is disabled");
}
}
private SolrCloudManager cloudManager;
private CloudSolrClient solrClient;
@Before
public void setupSystemCollection() throws Exception {
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
cloudManager = zkController.getSolrCloudManager();
solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
Optional.empty()).build();
// send a dummy doc to the .system collection
SolrInputDocument doc = new SolrInputDocument(
"id", IdUtils.timeRandomId(),
CommonParams.TYPE, "dummy");
doc.addField("time_l", cloudManager.getTimeSource().getEpochTimeNs());
doc.addField("timestamp", new Date());
solrClient.add(CollectionAdminParams.SYSTEM_COLL, doc);
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
Replica leader
= solrClient.getZkStateReader().getLeaderRetry(CollectionAdminParams.SYSTEM_COLL, "shard1", DEFAULT_TIMEOUT);
final AtomicReference<Long> coreStartTime = new AtomicReference<>(getCoreStatus(leader).getCoreStartTime().getTime());
// trigger compat report by changing the schema
SchemaRequest req = new SchemaRequest();
SchemaResponse rsp = req.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
Map<String, Object> field = getSchemaField("timestamp", rsp);
// make some obviously incompatible changes
field.put("type", "string");
field.put("docValues", false);
SchemaRequest.ReplaceField replaceFieldRequest = new SchemaRequest.ReplaceField(field);
SchemaResponse.UpdateResponse replaceFieldResponse = replaceFieldRequest.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
assertEquals(replaceFieldResponse.toString(), 0, replaceFieldResponse.getStatus());
CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(CollectionAdminParams.SYSTEM_COLL);
CollectionAdminResponse response = reloadRequest.process(solrClient);
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
// wait for the reload to complete
RetryUtil.retryUntil("Timed out waiting for core to reload", 30, 1000, TimeUnit.MILLISECONDS, () -> {
long restartTime = 0;
try {
restartTime = getCoreStatus(leader).getCoreStartTime().getTime();
} catch (Exception e) {
log.warn("Exception getting core start time: {}", e.getMessage());
return false;
}
return restartTime > coreStartTime.get();
});
cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
}
@After
public void doAfter() throws Exception {
cluster.deleteAllCollections();
solrClient.close();
}
private Map<String, Object> getSchemaField(String name, SchemaResponse schemaResponse) {
List<Map<String, Object>> fields = schemaResponse.getSchemaRepresentation().getFields();
for (Map<String, Object> field : fields) {
if (name.equals(field.get("name"))) {
return field;
}
}
return null;
}
@Test
public void testBackCompat() throws Exception {
CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
CloudSolrClient solrClient = cluster.getSolrClient();
CollectionAdminResponse adminResponse = status.process(solrClient);
NamedList<Object> response = adminResponse.getResponse();
String leader = (String) response.get("leader");
JettySolrRunner overseerNode = null;
int index = -1;
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
for (int i = 0; i < jettySolrRunners.size(); i++) {
JettySolrRunner runner = jettySolrRunners.get(i);
if (runner.getNodeName().equals(leader)) {
overseerNode = runner;
index = i;
break;
}
}
assertNotNull(overseerNode);
LogWatcherConfig watcherCfg = new LogWatcherConfig(true, null, "WARN", 100);
LogWatcher watcher = LogWatcher.newRegisteredLogWatcher(watcherCfg, null);
watcher.reset();
// restart Overseer to trigger the back-compat check
cluster.stopJettySolrRunner(index);
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
while (!timeOut.hasTimedOut()) {
adminResponse = status.process(solrClient);
response = adminResponse.getResponse();
String newLeader = (String) response.get("leader");
if (newLeader != null && !leader.equals(newLeader)) {
break;
}
timeOut.sleep(200);
}
if (timeOut.hasTimedOut()) {
fail("time out waiting for new Overseer leader");
}
TimeOut timeOut1 = new TimeOut(60, TimeUnit.SECONDS, cloudManager.getTimeSource());
boolean foundWarning = false;
boolean foundSchemaWarning = false;
while (!timeOut1.hasTimedOut()) {
timeOut1.sleep(1000);
SolrDocumentList history = watcher.getHistory(-1, null);
for (SolrDocument doc : history) {
if (!Overseer.class.getName().equals(doc.getFieldValue("logger"))) {
continue;
}
if (doc.getFieldValue("message").toString().contains("re-indexing")) {
foundWarning = true;
}
if (doc.getFieldValue("message").toString().contains("timestamp")) {
foundSchemaWarning = true;
}
}
if (foundWarning && foundSchemaWarning) {
break;
}
}
assertTrue("re-indexing warning not found", foundWarning);
assertTrue("timestamp field incompatibility warning not found", foundSchemaWarning);
}
}

View File

@ -198,6 +198,7 @@ The attributes that can be modified are:
See the <<create,CREATE action>> section above for details on these attributes.
[[readonlymode]]
==== Read-only mode
Setting the `readOnly` attribute to `true` puts the collection in read-only mode,
in which any index update requests are rejected. Other collection-level actions (eg. adding /
@ -218,6 +219,125 @@ NOTE: This may potentially take a long time if there are still major segment mer
Removing the `readOnly` property or setting it to false enables the
processing of updates and reloads the collection.
[[reindexcollection]]
== REINDEXCOLLECTION: Re-index a Collection
`/admin/collections?action=REINDEXCOLLECTION&name=_name_`
The REINDEXCOLLECTION command re-indexes a collection using existing data from the
source collection.
NOTE: Re-indexing is potentially a lossy operation - some of the existing indexed data that is not
available as stored fields may be lost, so users should use this command
with caution, evaluating the potential impact by using different source and target
collection names first, and preserving the source collection until the evaluation is
complete.
The target collection must not exist (and may not be an alias). If the target
collection name is the same as the source collection then first a unique sequential name
will be generated for the target collection, and then after re-indexing is done an alias
will be created that points from the source name to the actual sequentially-named target collection.
When re-indexing is started the source collection is put in <<readonlymode,read-only mode>> to ensure that
all source documents are properly processed.
Using optional parameters a different index schema, collection shape (number of shards and replicas)
or routing parameters can be requested for the target collection.
Re-indexing is executed as a streaming expression daemon, which runs on one of the
source collection's replicas. It is usually a time-consuming operation so it's recommended to execute
it as an asynchronous request in order to avoid request time outs. Only one re-indexing operation may
execute concurrently for a given source collection. Long-running, erroneous or crashed re-indexing
operations may be terminated by using the `abort` option, which also removes partial results.
=== REINDEXCOLLECTION Parameters
`name`::
Source collection name, may be an alias. This parameter is required.
`cmd`::
Optional command. Default command is `start`. Currently supported commands are:
* `start` - default, starts processing if not already running,
* `abort` - aborts an already running re-indexing (or clears a left-over status after a crash),
and deletes partial results,
* `status` - returns detailed status of a running re-indexing command.
`target`::
Target collection name, optional. If not specified a unique name will be generated and
after all documents have been copied an alias will be created that points from the source
collection name to the unique sequentially-named collection, effectively "hiding"
the original source collection from regular update and search operations.
`q`::
Optional query to select documents for re-indexing. Default value is `\*:*`.
`fl`::
Optional list of fields to re-index. Default value is `*`.
`rows`::
Documents are transferred in batches. Depending on the average size of the document large
batch sizes may cause memory issues. Default value is 100.
`configName`::
`collection.configName`::
Optional name of the configset for the target collection. Default is the same as the
source collection.
There's a number of optional parameters that determine the target collection layout. If they
are not specified in the request then their values are copied from the source collection.
The following parameters are currently supported (described in details in the <<create,CREATE collection>> section):
`numShards`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `maxShardsPerNode`,
`autoAddReplicas`, `shards`, `policy`, `createNodeSet`, `createNodeSet.shuffle`, `router.*`.
`removeSource`::
Optional boolean. If true then after the processing is successfully finished the source collection will
be deleted.
`async`::
Optional request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>.
When the re-indexing process has completed the target collection is marked using
`property.rx: "finished"`, and the source collection state is updated to become read-write.
On any errors the command will delete any temporary and target collections and also reset the
state of the source collection's read-only flag.
=== Examples using REINDEXCOLLECTION
*Input*
[source,text]
----
http://localhost:8983/solr/admin/collections?action=REINDEXCOLLECTION&name=newCollection&numShards=3&configName=conf2&q=id:aa*&fl=id,string_s
----
This request specifies a different schema for the target collection, copies only some of the fields, selects only the documents
matching a query, and also potentially re-shapes the collection by explicitly specifying 3 shards. Since the target collection
hasn't been specified in the parameters a collection with a unique name eg. `.rx_newCollection_2` will be created and on success
an alias pointing from `newCollection` to `.rx_newCollection_2` will be created, effectively replacing the source collection
for the purpose of indexing and searching. The source collection is assumed to be small so a synchronous request was made.
*Output*
[source,json]
----
{
"responseHeader":{
"status":0,
"QTime":10757},
"reindexStatus":{
"phase":"done",
"inputDocs":13416,
"processedDocs":376,
"actualSourceCollection":".rx_newCollection_1",
"state":"finished",
"actualTargetCollection":".rx_newCollection_2",
"checkpointCollection":".rx_ck_newCollection"
}
}
----
As a result a new collection `.rx_newCollection_2` has been created, with selected documents re-indexed to 3 shards, and
with an alias pointing from `newCollection` to this one. The status also shows that the source collection
was already an alias to `.rx_newCollection_1`, which was likely a result of a previous re-indexing.
[[reload]]
== RELOAD: Reload a Collection

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@ -51,7 +52,7 @@ public class DaemonStream extends TupleStream implements Expressible {
private ArrayBlockingQueue<Tuple> queue;
private int queueSize;
private boolean eatTuples;
private long iterations;
private AtomicLong iterations = new AtomicLong();
private long startTime;
private long stopTime;
private Exception exception;
@ -240,7 +241,7 @@ public class DaemonStream extends TupleStream implements Expressible {
tuple.put(ID, id);
tuple.put("startTime", startTime);
tuple.put("stopTime", stopTime);
tuple.put("iterations", iterations);
tuple.put("iterations", iterations.get());
tuple.put("state", streamRunner.getState().toString());
if(exception != null) {
tuple.put("exception", exception.getMessage());
@ -253,10 +254,6 @@ public class DaemonStream extends TupleStream implements Expressible {
this.daemons = daemons;
}
private synchronized void incrementIterations() {
++iterations;
}
private synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
@ -332,7 +329,7 @@ public class DaemonStream extends TupleStream implements Expressible {
log.error("Error in DaemonStream:" + id, e);
++errors;
if (errors > 100) {
log.error("Too many consectutive errors. Stopping DaemonStream:" + id);
log.error("Too many consecutive errors. Stopping DaemonStream:" + id);
break OUTER;
}
} catch (Throwable t) {
@ -351,7 +348,7 @@ public class DaemonStream extends TupleStream implements Expressible {
}
}
}
incrementIterations();
iterations.incrementAndGet();
if (sleepMillis > 0) {
try {

View File

@ -783,6 +783,90 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
/**
* Returns a SolrRequest to reindex a collection
*/
public static ReindexCollection reindexCollection(String collection) {
return new ReindexCollection(collection);
}
public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
String target;
String query;
String fields;
String configName;
Boolean removeSource;
String cmd;
Integer batchSize;
Map<String, Object> collectionParams = new HashMap<>();
private ReindexCollection(String collection) {
super(CollectionAction.REINDEXCOLLECTION, collection);
}
/** Target collection name (null if the same). */
public ReindexCollection setTarget(String target) {
this.target = target;
return this;
}
/** Set optional command (eg. abort, status). */
public ReindexCollection setCommand(String command) {
this.cmd = command;
return this;
}
/** Query matching the documents to reindex (default is '*:*'). */
public ReindexCollection setQuery(String query) {
this.query = query;
return this;
}
/** Fields to reindex (the same syntax as {@link CommonParams#FL}), default is '*'. */
public ReindexCollection setFields(String fields) {
this.fields = fields;
return this;
}
/** Remove source collection after success. Default is false. */
public ReindexCollection setRemoveSource(boolean removeSource) {
this.removeSource = removeSource;
return this;
}
/** Copy documents in batches of this size. Default is 100. */
public ReindexCollection setBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
/** Config name for the target collection. Default is the same as source. */
public ReindexCollection setConfigName(String configName) {
this.configName = configName;
return this;
}
/** Set other supported collection CREATE parameters. */
public ReindexCollection setCollectionParam(String key, Object value) {
this.collectionParams.put(key, value);
return this;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.setNonNull("target", target);
params.setNonNull("cmd", cmd);
params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
params.setNonNull(CommonParams.Q, query);
params.setNonNull(CommonParams.FL, fields);
params.setNonNull("removeSource", removeSource);
params.setNonNull(CommonParams.ROWS, batchSize);
collectionParams.forEach((k, v) -> params.setNonNull(k, v));
return params;
}
}
/**
* Return a SolrRequest for low-level detailed status of the collection.
*/
@ -823,10 +907,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams)super.getParams();
params.setNonNull("segments", withSegments.toString());
params.setNonNull("fieldInfo", withFieldInfo.toString());
params.setNonNull("coreInfo", withCoreInfo.toString());
params.setNonNull("sizeInfo", withSizeInfo.toString());
params.setNonNull("segments", withSegments);
params.setNonNull("fieldInfo", withFieldInfo);
params.setNonNull("coreInfo", withCoreInfo);
params.setNonNull("sizeInfo", withSizeInfo);
return params;
}
}

View File

@ -100,6 +100,11 @@ public class CompositeIdRouter extends HashBasedRouter {
return targetSlices;
}
@Override
public String getName() {
return NAME;
}
public List<Range> partitionRangeByKey(String key, Range range) {
List<Range> result = new ArrayList<>(3);
Range keyRange = keyHashRange(key);

View File

@ -223,6 +223,7 @@ public abstract class DocRouter {
public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
public abstract String getName();
/** This method is consulted to determine what slices should be queried for a request when
* an explicit shards parameter was not used.

View File

@ -75,6 +75,11 @@ public class ImplicitDocRouter extends DocRouter {
return false;
}
@Override
public String getName() {
return NAME;
}
@Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {

View File

@ -19,4 +19,9 @@ package org.apache.solr.common.cloud;
public class PlainIdRouter extends HashBasedRouter {
public static final String NAME = "plain";
@Override
public String getName() {
return NAME;
}
}

View File

@ -123,7 +123,9 @@ public interface CollectionParams {
NONE(false, LockLevel.NONE),
// TODO: not implemented yet
MERGESHARDS(true, LockLevel.SHARD),
COLSTATUS(true, LockLevel.NONE)
COLSTATUS(true, LockLevel.NONE),
// this command implements its own locking
REINDEXCOLLECTION(true, LockLevel.NONE)
;
public final boolean isWrite;

View File

@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -572,7 +573,7 @@ public class Utils {
VersionedData data = null;
try {
data = distribStateManager.getData(path);
} catch (KeeperException.NoNodeException e) {
} catch (KeeperException.NoNodeException | NoSuchElementException e) {
return Collections.emptyMap();
}
if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();