mirror of https://github.com/apache/lucene.git
SOLR-9523: Refactor CoreAdminOperation into smaller classes
This commit is contained in:
parent
61955efc50
commit
286b35b020
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.handler.SnapShooter;
|
||||
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
|
||||
class BackupCoreOp implements CoreAdminHandler.CoreAdminOp {
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Internal SolrCloud API");
|
||||
}
|
||||
|
||||
final SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
BackupRepository repository = it.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
|
||||
|
||||
String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
|
||||
if (location == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
|
||||
+ " parameter or as a default repository property");
|
||||
}
|
||||
|
||||
// An optional parameter to describe the snapshot to be backed-up. If this
|
||||
// parameter is not supplied, the latest index commit is backed-up.
|
||||
String commitName = params.get(CoreAdminParams.COMMIT_NAME);
|
||||
|
||||
URI locationUri = repository.createURI(location);
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
SnapShooter snapShooter = new SnapShooter(repository, core, locationUri, name, commitName);
|
||||
// validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
|
||||
// But we want to throw. One reason is that
|
||||
// this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
|
||||
// file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
|
||||
if (!snapShooter.getBackupRepository().exists(snapShooter.getLocation())) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Directory to contain snapshots doesn't exist: " + snapShooter.getLocation());
|
||||
}
|
||||
snapShooter.validateCreateSnapshot();
|
||||
snapShooter.createSnapshot();
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed to backup core=" + cname + " because " + e, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,68 +18,30 @@ package org.apache.solr.handler.admin;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.SyncStrategy;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.core.CachingDirectoryFactory;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.DirectoryFactory.DirContext;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotManager;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
|
||||
import org.apache.solr.handler.RestoreCore;
|
||||
import org.apache.solr.handler.SnapShooter;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminOp;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.CommitUpdateCommand;
|
||||
import org.apache.solr.update.MergeIndexesCommand;
|
||||
import org.apache.solr.update.SplitIndexCommand;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessor;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||
import org.apache.solr.util.NumberUtils;
|
||||
import org.apache.solr.util.PropertiesUtil;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
|
@ -87,14 +49,11 @@ import org.apache.solr.util.TestInjection;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.common.params.CommonParams.PATH;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.*;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.Invocable;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
|
||||
|
@ -152,38 +111,7 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error handling 'reload' action", ex);
|
||||
}
|
||||
}),
|
||||
|
||||
STATUS_OP(STATUS, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
String indexInfo = params.get(CoreAdminParams.INDEX_INFO);
|
||||
boolean isIndexInfoNeeded = Boolean.parseBoolean(null == indexInfo ? "true" : indexInfo);
|
||||
NamedList<Object> status = new SimpleOrderedMap<>();
|
||||
Map<String, Exception> failures = new HashMap<>();
|
||||
for (Entry<String, CoreContainer.CoreLoadFailure> failure : it.handler.coreContainer.getCoreInitFailures().entrySet()) {
|
||||
failures.put(failure.getKey(), failure.getValue().exception);
|
||||
}
|
||||
try {
|
||||
if (cname == null) {
|
||||
for (String name : it.handler.coreContainer.getAllCoreNames()) {
|
||||
status.add(name, getCoreStatus(it.handler.coreContainer, name, isIndexInfoNeeded));
|
||||
}
|
||||
it.rsp.add("initFailures", failures);
|
||||
} else {
|
||||
failures = failures.containsKey(cname)
|
||||
? Collections.singletonMap(cname, failures.get(cname))
|
||||
: Collections.<String, Exception>emptyMap();
|
||||
it.rsp.add("initFailures", failures);
|
||||
status.add(cname, getCoreStatus(it.handler.coreContainer, cname, isIndexInfoNeeded));
|
||||
}
|
||||
it.rsp.add("status", status);
|
||||
} catch (Exception ex) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Error handling 'status' action ", ex);
|
||||
}
|
||||
}),
|
||||
|
||||
STATUS_OP(STATUS, new StatusOp()),
|
||||
SWAP_OP(SWAP, it -> {
|
||||
final SolrParams params = it.req.getParams();
|
||||
final String cname = params.get(CoreAdminParams.CORE);
|
||||
|
@ -201,360 +129,11 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
it.handler.coreContainer.rename(cname, name);
|
||||
}),
|
||||
|
||||
MERGEINDEXES_OP(MERGEINDEXES, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
SolrCore core = it.handler.coreContainer.getCore(cname);
|
||||
SolrQueryRequest wrappedReq = null;
|
||||
MERGEINDEXES_OP(MERGEINDEXES, new MergeIndexesOp()),
|
||||
|
||||
List<SolrCore> sourceCores = Lists.newArrayList();
|
||||
List<RefCounted<SolrIndexSearcher>> searchers = Lists.newArrayList();
|
||||
// stores readers created from indexDir param values
|
||||
List<DirectoryReader> readersToBeClosed = Lists.newArrayList();
|
||||
Map<Directory, Boolean> dirsToBeReleased = new HashMap<>();
|
||||
if (core != null) {
|
||||
try {
|
||||
String[] dirNames = params.getParams(CoreAdminParams.INDEX_DIR);
|
||||
if (dirNames == null || dirNames.length == 0) {
|
||||
String[] sources = params.getParams("srcCore");
|
||||
if (sources == null || sources.length == 0)
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"At least one indexDir or srcCore must be specified");
|
||||
SPLIT_OP(SPLIT, new SplitOp()),
|
||||
|
||||
for (int i = 0; i < sources.length; i++) {
|
||||
String source = sources[i];
|
||||
SolrCore srcCore = it.handler.coreContainer.getCore(source);
|
||||
if (srcCore == null)
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Core: " + source + " does not exist");
|
||||
sourceCores.add(srcCore);
|
||||
}
|
||||
} else {
|
||||
DirectoryFactory dirFactory = core.getDirectoryFactory();
|
||||
for (int i = 0; i < dirNames.length; i++) {
|
||||
boolean markAsDone = false;
|
||||
if (dirFactory instanceof CachingDirectoryFactory) {
|
||||
if (!((CachingDirectoryFactory) dirFactory).getLivePaths().contains(dirNames[i])) {
|
||||
markAsDone = true;
|
||||
}
|
||||
}
|
||||
Directory dir = dirFactory.get(dirNames[i], DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
|
||||
dirsToBeReleased.put(dir, markAsDone);
|
||||
// TODO: why doesn't this use the IR factory? what is going on here?
|
||||
readersToBeClosed.add(DirectoryReader.open(dir));
|
||||
}
|
||||
}
|
||||
|
||||
List<DirectoryReader> readers = null;
|
||||
if (readersToBeClosed.size() > 0) {
|
||||
readers = readersToBeClosed;
|
||||
} else {
|
||||
readers = Lists.newArrayList();
|
||||
for (SolrCore solrCore : sourceCores) {
|
||||
// record the searchers so that we can decref
|
||||
RefCounted<SolrIndexSearcher> searcher = solrCore.getSearcher();
|
||||
searchers.add(searcher);
|
||||
readers.add(searcher.get().getIndexReader());
|
||||
}
|
||||
}
|
||||
|
||||
UpdateRequestProcessorChain processorChain =
|
||||
core.getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
|
||||
wrappedReq = new LocalSolrQueryRequest(core, it.req.getParams());
|
||||
UpdateRequestProcessor processor =
|
||||
processorChain.createProcessor(wrappedReq, it.rsp);
|
||||
processor.processMergeIndexes(new MergeIndexesCommand(readers, it.req));
|
||||
} catch (Exception e) {
|
||||
// log and rethrow so that if the finally fails we don't lose the original problem
|
||||
log().error("ERROR executing merge:", e);
|
||||
throw e;
|
||||
} finally {
|
||||
for (RefCounted<SolrIndexSearcher> searcher : searchers) {
|
||||
if (searcher != null) searcher.decref();
|
||||
}
|
||||
for (SolrCore solrCore : sourceCores) {
|
||||
if (solrCore != null) solrCore.close();
|
||||
}
|
||||
IOUtils.closeWhileHandlingException(readersToBeClosed);
|
||||
Set<Entry<Directory, Boolean>> entries = dirsToBeReleased.entrySet();
|
||||
for (Entry<Directory, Boolean> entry : entries) {
|
||||
DirectoryFactory dirFactory = core.getDirectoryFactory();
|
||||
Directory dir = entry.getKey();
|
||||
boolean markAsDone = entry.getValue();
|
||||
if (markAsDone) {
|
||||
dirFactory.doneWithDirectory(dir);
|
||||
}
|
||||
dirFactory.release(dir);
|
||||
}
|
||||
if (wrappedReq != null) wrappedReq.close();
|
||||
core.close();
|
||||
}
|
||||
}
|
||||
}),
|
||||
|
||||
SPLIT_OP(SPLIT, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
List<DocRouter.Range> ranges = null;
|
||||
|
||||
String[] pathsArr = params.getParams(PATH);
|
||||
String rangesStr = params.get(CoreAdminParams.RANGES); // ranges=a-b,c-d,e-f
|
||||
if (rangesStr != null) {
|
||||
String[] rangesArr = rangesStr.split(",");
|
||||
if (rangesArr.length == 0) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least one range specified to split an index");
|
||||
} else {
|
||||
ranges = new ArrayList<>(rangesArr.length);
|
||||
for (String r : rangesArr) {
|
||||
try {
|
||||
ranges.add(DocRouter.DEFAULT.fromString(r));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Exception parsing hexadecimal hash range: " + r, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
String splitKey = params.get("split.key");
|
||||
String[] newCoreNames = params.getParams("targetCore");
|
||||
String cname = params.get(CoreAdminParams.CORE, "");
|
||||
|
||||
if ((pathsArr == null || pathsArr.length == 0) && (newCoreNames == null || newCoreNames.length == 0)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Either path or targetCore param must be specified");
|
||||
}
|
||||
|
||||
log().info("Invoked split action for core: " + cname);
|
||||
SolrCore core = it.handler.coreContainer.getCore(cname);
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
|
||||
List<SolrCore> newCores = null;
|
||||
|
||||
try {
|
||||
// TODO: allow use of rangesStr in the future
|
||||
List<String> paths = null;
|
||||
int partitions = pathsArr != null ? pathsArr.length : newCoreNames.length;
|
||||
|
||||
DocRouter router = null;
|
||||
String routeFieldName = null;
|
||||
if (it.handler.coreContainer.isZooKeeperAware()) {
|
||||
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
|
||||
String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
Slice slice = collection.getSlice(sliceName);
|
||||
router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
if (ranges == null) {
|
||||
DocRouter.Range currentRange = slice.getRange();
|
||||
ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
|
||||
}
|
||||
Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
|
||||
if (routerObj != null && routerObj instanceof Map) {
|
||||
Map routerProps = (Map) routerObj;
|
||||
routeFieldName = (String) routerProps.get("field");
|
||||
}
|
||||
}
|
||||
|
||||
if (pathsArr == null) {
|
||||
newCores = new ArrayList<>(partitions);
|
||||
for (String newCoreName : newCoreNames) {
|
||||
SolrCore newcore = it.handler.coreContainer.getCore(newCoreName);
|
||||
if (newcore != null) {
|
||||
newCores.add(newcore);
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
paths = Arrays.asList(pathsArr);
|
||||
}
|
||||
|
||||
|
||||
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
|
||||
core.getUpdateHandler().split(cmd);
|
||||
|
||||
// After the split has completed, someone (here?) should start the process of replaying the buffered updates.
|
||||
|
||||
} catch (Exception e) {
|
||||
log().error("ERROR executing split:", e);
|
||||
throw new RuntimeException(e);
|
||||
|
||||
} finally {
|
||||
if (req != null) req.close();
|
||||
if (core != null) core.close();
|
||||
if (newCores != null) {
|
||||
for (SolrCore newCore : newCores) {
|
||||
newCore.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
|
||||
PREPRECOVERY_OP(PREPRECOVERY, it -> {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
cname = "";
|
||||
}
|
||||
|
||||
String nodeName = params.get("nodeName");
|
||||
String coreNodeName = params.get("coreNodeName");
|
||||
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
|
||||
Boolean checkLive = params.getBool("checkLive");
|
||||
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
|
||||
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
|
||||
|
||||
log().info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
|
||||
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
|
||||
+ ", onlyIfLeaderActive: " + onlyIfLeaderActive);
|
||||
|
||||
int maxTries = 0;
|
||||
Replica.State state = null;
|
||||
boolean live = false;
|
||||
int retry = 0;
|
||||
while (true) {
|
||||
CoreContainer coreContainer = it.handler.coreContainer;
|
||||
try (SolrCore core = coreContainer.getCore(cname)) {
|
||||
if (core == null && retry == 30) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:"
|
||||
+ cname);
|
||||
}
|
||||
if (core != null) {
|
||||
if (onlyIfLeader != null && onlyIfLeader) {
|
||||
if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "We are not the leader");
|
||||
}
|
||||
}
|
||||
|
||||
// wait until we are sure the recovering node is ready
|
||||
// to accept updates
|
||||
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
|
||||
.getCloudDescriptor();
|
||||
String collectionName = cloudDescriptor.getCollectionName();
|
||||
|
||||
if (retry % 15 == 0) {
|
||||
if (retry > 0 && log().isInfoEnabled())
|
||||
log().info("After " + retry + " seconds, core " + cname + " (" +
|
||||
cloudDescriptor.getShardId() + " of " +
|
||||
cloudDescriptor.getCollectionName() + ") still does not have state: " +
|
||||
waitForState + "; forcing ClusterState update from ZooKeeper");
|
||||
|
||||
// force a cluster state update
|
||||
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
|
||||
}
|
||||
|
||||
if (maxTries == 0) {
|
||||
// wait long enough for the leader conflict to work itself out plus a little extra
|
||||
int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
|
||||
maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
|
||||
log().info("Will wait a max of " + maxTries + " seconds to see " + cname + " (" +
|
||||
cloudDescriptor.getShardId() + " of " +
|
||||
cloudDescriptor.getCollectionName() + ") have state: " + waitForState);
|
||||
}
|
||||
|
||||
ClusterState clusterState = coreContainer.getZkController().getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
Slice slice = collection.getSlice(cloudDescriptor.getShardId());
|
||||
if (slice != null) {
|
||||
final Replica replica = slice.getReplicasMap().get(coreNodeName);
|
||||
if (replica != null) {
|
||||
state = replica.getState();
|
||||
live = clusterState.liveNodesContain(nodeName);
|
||||
|
||||
final Replica.State localState = cloudDescriptor.getLastPublished();
|
||||
|
||||
// TODO: This is funky but I've seen this in testing where the replica asks the
|
||||
// leader to be in recovery? Need to track down how that happens ... in the meantime,
|
||||
// this is a safeguard
|
||||
boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
|
||||
onlyIfLeader &&
|
||||
core.getName().equals(replica.getStr("core")) &&
|
||||
waitForState == Replica.State.RECOVERING &&
|
||||
localState == Replica.State.ACTIVE &&
|
||||
state == Replica.State.ACTIVE);
|
||||
|
||||
if (leaderDoesNotNeedRecovery) {
|
||||
log().warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
|
||||
}
|
||||
|
||||
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
|
||||
log().info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
|
||||
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
|
||||
", isLeader? " + core.getCoreDescriptor().getCloudDescriptor().isLeader() +
|
||||
", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state.toString() + ", localState=" + localState + ", nodeName=" + nodeName +
|
||||
", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult + ", nodeProps: " + replica);
|
||||
|
||||
if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
|
||||
if (checkLive == null) {
|
||||
break;
|
||||
} else if (checkLive && live) {
|
||||
break;
|
||||
} else if (!checkLive && !live) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (retry++ == maxTries) {
|
||||
String collection = null;
|
||||
String leaderInfo = null;
|
||||
String shardId = null;
|
||||
try {
|
||||
CloudDescriptor cloudDescriptor =
|
||||
core.getCoreDescriptor().getCloudDescriptor();
|
||||
collection = cloudDescriptor.getCollectionName();
|
||||
shardId = cloudDescriptor.getShardId();
|
||||
leaderInfo = coreContainer.getZkController().
|
||||
getZkStateReader().getLeaderUrl(collection, shardId, 5000);
|
||||
} catch (Exception exc) {
|
||||
leaderInfo = "Not available due to: " + exc;
|
||||
}
|
||||
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"I was asked to wait on state " + waitForState + " for "
|
||||
+ shardId + " in " + collection + " on " + nodeName
|
||||
+ " but I still do not see the requested state. I see state: "
|
||||
+ state.toString() + " live:" + live + " leader from ZK: " + leaderInfo
|
||||
);
|
||||
}
|
||||
|
||||
if (coreContainer.isShutDown()) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Solr is shutting down");
|
||||
}
|
||||
|
||||
// solrcloud_debug
|
||||
if (log().isDebugEnabled()) {
|
||||
try {
|
||||
LocalSolrQueryRequest r = new LocalSolrQueryRequest(core,
|
||||
new ModifiableSolrParams());
|
||||
CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
|
||||
commitCmd.softCommit = true;
|
||||
core.getUpdateHandler().commit(commitCmd);
|
||||
RefCounted<SolrIndexSearcher> searchHolder = core
|
||||
.getNewestSearcher(false);
|
||||
SolrIndexSearcher searcher = searchHolder.get();
|
||||
try {
|
||||
log().debug(core.getCoreDescriptor().getCoreContainer()
|
||||
.getZkController().getNodeName()
|
||||
+ " to replicate "
|
||||
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits
|
||||
+ " gen:"
|
||||
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
|
||||
+ " data:" + core.getDataDir());
|
||||
} finally {
|
||||
searchHolder.decref();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log().debug("Error in solrcloud_debug block", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
log().info("Waited coreNodeName: " + coreNodeName + ", state: " + waitForState
|
||||
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader + " for: " + retry + " seconds.");
|
||||
}),
|
||||
PREPRECOVERY_OP(PREPRECOVERY, new PrepRecoveryOp()),
|
||||
|
||||
REQUESTRECOVERY_OP(REQUESTRECOVERY, it -> {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
@ -574,63 +153,7 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
}).start();
|
||||
|
||||
}),
|
||||
REQUESTSYNCSHARD_OP(REQUESTSYNCSHARD, it -> {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
log().info("I have been requested to sync up my shard");
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
|
||||
}
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
SyncStrategy syncStrategy = null;
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
|
||||
if (core != null) {
|
||||
syncStrategy = new SyncStrategy(core.getCoreDescriptor().getCoreContainer());
|
||||
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, cname);
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
|
||||
|
||||
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true);
|
||||
// solrcloud_debug
|
||||
if (log().isDebugEnabled()) {
|
||||
try {
|
||||
RefCounted<SolrIndexSearcher> searchHolder = core
|
||||
.getNewestSearcher(false);
|
||||
SolrIndexSearcher searcher = searchHolder.get();
|
||||
try {
|
||||
log().debug(core.getCoreDescriptor().getCoreContainer()
|
||||
.getZkController().getNodeName()
|
||||
+ " synched "
|
||||
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
|
||||
} finally {
|
||||
searchHolder.decref();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log().debug("Error in solrcloud_debug block", e);
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
|
||||
}
|
||||
} else {
|
||||
SolrException.log(log(), "Could not find core to call sync:" + cname);
|
||||
}
|
||||
} finally {
|
||||
// no recoveryStrat close for now
|
||||
if (syncStrategy != null) {
|
||||
syncStrategy.close();
|
||||
}
|
||||
}
|
||||
}),
|
||||
REQUESTSYNCSHARD_OP(REQUESTSYNCSHARD, new RequestSyncShardOp()),
|
||||
|
||||
REQUESTBUFFERUPDATES_OP(REQUESTBUFFERUPDATES, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
|
@ -656,45 +179,7 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
if (it.req != null) it.req.close();
|
||||
}
|
||||
}),
|
||||
REQUESTAPPLYUPDATES_OP(REQUESTAPPLYUPDATES, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.NAME, "");
|
||||
log().info("Applying buffered updates on core: " + cname);
|
||||
CoreContainer coreContainer = it.handler.coreContainer;
|
||||
try (SolrCore core = coreContainer.getCore(cname)) {
|
||||
if (core == null)
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Core [" + cname + "] not found");
|
||||
UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
|
||||
if (updateLog.getState() != UpdateLog.State.BUFFERING) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Core " + cname + " not in buffering state");
|
||||
}
|
||||
Future<UpdateLog.RecoveryInfo> future = updateLog.applyBufferedUpdates();
|
||||
if (future == null) {
|
||||
log().info("No buffered updates available. core=" + cname);
|
||||
it.rsp.add("core", cname);
|
||||
it.rsp.add("status", "EMPTY_BUFFER");
|
||||
return;
|
||||
}
|
||||
UpdateLog.RecoveryInfo report = future.get();
|
||||
if (report.failed) {
|
||||
SolrException.log(log(), "Replay failed");
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
|
||||
}
|
||||
coreContainer.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
it.rsp.add("core", cname);
|
||||
it.rsp.add("status", "BUFFER_APPLIED");
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log().warn("Recovery was interrupted", e);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof SolrException)
|
||||
throw (SolrException) e;
|
||||
else
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not apply buffered updates", e);
|
||||
} finally {
|
||||
if (it.req != null) it.req.close();
|
||||
}
|
||||
}),
|
||||
REQUESTAPPLYUPDATES_OP(REQUESTAPPLYUPDATES, new RequestApplyUpdatesOp()),
|
||||
|
||||
REQUESTSTATUS_OP(REQUESTSTATUS, it -> {
|
||||
SolrParams params = it.req.getParams();
|
||||
|
@ -737,18 +222,7 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
log().warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELECTION. No action taken.");
|
||||
}
|
||||
}),
|
||||
|
||||
INVOKE_OP(INVOKE, it -> {
|
||||
String[] klas = it.req.getParams().getParams("class");
|
||||
if (klas == null || klas.length == 0) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "class is a required param");
|
||||
}
|
||||
for (String c : klas) {
|
||||
Map<String, Object> result = invokeAClass(it.req, c);
|
||||
it.rsp.add(c, result);
|
||||
}
|
||||
}),
|
||||
|
||||
INVOKE_OP(INVOKE, new InvokeOp()),
|
||||
FORCEPREPAREFORLEADERSHIP_OP(FORCEPREPAREFORLEADERSHIP, it -> {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
|
@ -774,152 +248,10 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
}
|
||||
}),
|
||||
|
||||
BACKUPCORE_OP(BACKUPCORE, it -> {
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Internal SolrCloud API");
|
||||
}
|
||||
|
||||
final SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
BackupRepository repository = it.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
|
||||
|
||||
String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
|
||||
if (location == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
|
||||
+ " parameter or as a default repository property");
|
||||
}
|
||||
|
||||
// An optional parameter to describe the snapshot to be backed-up. If this
|
||||
// parameter is not supplied, the latest index commit is backed-up.
|
||||
String commitName = params.get(CoreAdminParams.COMMIT_NAME);
|
||||
|
||||
URI locationUri = repository.createURI(location);
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
SnapShooter snapShooter = new SnapShooter(repository, core, locationUri, name, commitName);
|
||||
// validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
|
||||
// But we want to throw. One reason is that
|
||||
// this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
|
||||
// file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
|
||||
if (!snapShooter.getBackupRepository().exists(snapShooter.getLocation())) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Directory to contain snapshots doesn't exist: " + snapShooter.getLocation());
|
||||
}
|
||||
snapShooter.validateCreateSnapshot();
|
||||
snapShooter.createSnapshot();
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed to backup core=" + cname + " because " + e, e);
|
||||
}
|
||||
}),
|
||||
|
||||
RESTORECORE_OP(RESTORECORE, it -> {
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
|
||||
}
|
||||
|
||||
final SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
BackupRepository repository = it.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
|
||||
|
||||
String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
|
||||
if (location == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
|
||||
+ " parameter or as a default repository property");
|
||||
}
|
||||
|
||||
URI locationUri = repository.createURI(location);
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name);
|
||||
boolean success = restoreCore.doRestore();
|
||||
if (!success) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
|
||||
}
|
||||
}
|
||||
}),
|
||||
CREATESNAPSHOT_OP(CREATESNAPSHOT, it -> {
|
||||
CoreContainer cc = it.handler.getCoreContainer();
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
try (SolrCore core = cc.getCore(cname)) {
|
||||
if (core == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
|
||||
}
|
||||
|
||||
String indexDirPath = core.getIndexDir();
|
||||
IndexCommit ic = core.getDeletionPolicy().getLatestCommit();
|
||||
if (ic == null) {
|
||||
RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
|
||||
try {
|
||||
ic = searcher.get().getIndexReader().getIndexCommit();
|
||||
} finally {
|
||||
searcher.decref();
|
||||
}
|
||||
}
|
||||
SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
|
||||
|
||||
it.rsp.add("core", core.getName());
|
||||
it.rsp.add("commitName", commitName);
|
||||
it.rsp.add("indexDirPath", indexDirPath);
|
||||
it.rsp.add("generation", ic.getGeneration());
|
||||
}
|
||||
}),
|
||||
DELETESNAPSHOT_OP(DELETESNAPSHOT, it -> {
|
||||
CoreContainer cc = it.handler.getCoreContainer();
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
try (SolrCore core = cc.getCore(cname)) {
|
||||
if (core == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
|
||||
}
|
||||
|
||||
SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
Optional<SnapshotMetaData> metadata = mgr.release(commitName);
|
||||
if (metadata.isPresent()) {
|
||||
long gen = metadata.get().getGenerationNumber();
|
||||
String indexDirPath = metadata.get().getIndexDirPath();
|
||||
|
||||
// If the directory storing the snapshot is not the same as the *current* core
|
||||
// index directory, then delete the files corresponding to this snapshot.
|
||||
// Otherwise we leave the index files related to snapshot as is (assuming the
|
||||
// underlying Solr IndexDeletionPolicy will clean them up appropriately).
|
||||
if (!indexDirPath.equals(core.getIndexDir())) {
|
||||
Directory d = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_NONE);
|
||||
try {
|
||||
SolrSnapshotManager.deleteIndexFiles(d, mgr.listSnapshotsInIndexDir(indexDirPath), gen);
|
||||
} finally {
|
||||
core.getDirectoryFactory().release(d);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
BACKUPCORE_OP(BACKUPCORE, new BackupCoreOp()),
|
||||
RESTORECORE_OP(RESTORECORE, new RestoreCoreOp()),
|
||||
CREATESNAPSHOT_OP(CREATESNAPSHOT, new CreateSnapshotOp()),
|
||||
DELETESNAPSHOT_OP(DELETESNAPSHOT, new DeleteSnapshotOp()),
|
||||
LISTSNAPSHOTS_OP(LISTSNAPSHOTS, it -> {
|
||||
CoreContainer cc = it.handler.getCoreContainer();
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
@ -1037,20 +369,6 @@ enum CoreAdminOperation implements CoreAdminOp {
|
|||
return size;
|
||||
}
|
||||
|
||||
static Map<String, Object> invokeAClass(SolrQueryRequest req, String c) {
|
||||
SolrResourceLoader loader = null;
|
||||
if (req.getCore() != null) loader = req.getCore().getResourceLoader();
|
||||
else if (req.getContext().get(CoreContainer.class.getName()) != null) {
|
||||
CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName());
|
||||
loader = cc.getResourceLoader();
|
||||
}
|
||||
|
||||
Invocable invokable = loader.newInstance(c, Invocable.class);
|
||||
Map<String, Object> result = invokable.invoke(req);
|
||||
log().info("Invocable_invoked {}", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(CallInfo it) throws Exception {
|
||||
fun.execute(it);
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
|
||||
class CreateSnapshotOp implements CoreAdminHandler.CoreAdminOp {
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
CoreContainer cc = it.handler.getCoreContainer();
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
try (SolrCore core = cc.getCore(cname)) {
|
||||
if (core == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
|
||||
}
|
||||
|
||||
String indexDirPath = core.getIndexDir();
|
||||
IndexCommit ic = core.getDeletionPolicy().getLatestCommit();
|
||||
if (ic == null) {
|
||||
RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
|
||||
try {
|
||||
ic = searcher.get().getIndexReader().getIndexCommit();
|
||||
} finally {
|
||||
searcher.decref();
|
||||
}
|
||||
}
|
||||
SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
|
||||
|
||||
it.rsp.add("core", core.getName());
|
||||
it.rsp.add("commitName", commitName);
|
||||
it.rsp.add("indexDirPath", indexDirPath);
|
||||
it.rsp.add("generation", ic.getGeneration());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotManager;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
|
||||
|
||||
|
||||
class DeleteSnapshotOp implements CoreAdminHandler.CoreAdminOp {
|
||||
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
CoreContainer cc = it.handler.getCoreContainer();
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String commitName = params.required().get(CoreAdminParams.COMMIT_NAME);
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
try (SolrCore core = cc.getCore(cname)) {
|
||||
if (core == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
|
||||
}
|
||||
|
||||
SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
Optional<SolrSnapshotMetaDataManager.SnapshotMetaData> metadata = mgr.release(commitName);
|
||||
if (metadata.isPresent()) {
|
||||
long gen = metadata.get().getGenerationNumber();
|
||||
String indexDirPath = metadata.get().getIndexDirPath();
|
||||
|
||||
// If the directory storing the snapshot is not the same as the *current* core
|
||||
// index directory, then delete the files corresponding to this snapshot.
|
||||
// Otherwise we leave the index files related to snapshot as is (assuming the
|
||||
// underlying Solr IndexDeletionPolicy will clean them up appropriately).
|
||||
if (!indexDirPath.equals(core.getIndexDir())) {
|
||||
Directory d = core.getDirectoryFactory().get(indexDirPath, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_NONE);
|
||||
try {
|
||||
SolrSnapshotManager.deleteIndexFiles(d, mgr.listSnapshotsInIndexDir(indexDirPath), gen);
|
||||
} finally {
|
||||
core.getDirectoryFactory().release(d);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class InvokeOp implements CoreAdminHandler.CoreAdminOp {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
static Map<String, Object> invokeAClass(SolrQueryRequest req, String c) {
|
||||
SolrResourceLoader loader = null;
|
||||
if (req.getCore() != null) loader = req.getCore().getResourceLoader();
|
||||
else if (req.getContext().get(CoreContainer.class.getName()) != null) {
|
||||
CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName());
|
||||
loader = cc.getResourceLoader();
|
||||
}
|
||||
|
||||
CoreAdminHandler.Invocable invokable = loader.newInstance(c, CoreAdminHandler.Invocable.class);
|
||||
Map<String, Object> result = invokable.invoke(req);
|
||||
log.info("Invocable_invoked {}", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
String[] klas = it.req.getParams().getParams("class");
|
||||
if (klas == null || klas.length == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "class is a required param");
|
||||
}
|
||||
for (String c : klas) {
|
||||
Map<String, Object> result = invokeAClass(it.req, c);
|
||||
it.rsp.add(c, result);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.core.CachingDirectoryFactory;
|
||||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.MergeIndexesCommand;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessor;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
class MergeIndexesOp implements CoreAdminHandler.CoreAdminOp {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
SolrParams params = it.req.getParams();
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
SolrCore core = it.handler.coreContainer.getCore(cname);
|
||||
SolrQueryRequest wrappedReq = null;
|
||||
|
||||
List<SolrCore> sourceCores = Lists.newArrayList();
|
||||
List<RefCounted<SolrIndexSearcher>> searchers = Lists.newArrayList();
|
||||
// stores readers created from indexDir param values
|
||||
List<DirectoryReader> readersToBeClosed = Lists.newArrayList();
|
||||
Map<Directory, Boolean> dirsToBeReleased = new HashMap<>();
|
||||
if (core != null) {
|
||||
try {
|
||||
String[] dirNames = params.getParams(CoreAdminParams.INDEX_DIR);
|
||||
if (dirNames == null || dirNames.length == 0) {
|
||||
String[] sources = params.getParams("srcCore");
|
||||
if (sources == null || sources.length == 0)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"At least one indexDir or srcCore must be specified");
|
||||
|
||||
for (int i = 0; i < sources.length; i++) {
|
||||
String source = sources[i];
|
||||
SolrCore srcCore = it.handler.coreContainer.getCore(source);
|
||||
if (srcCore == null)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Core: " + source + " does not exist");
|
||||
sourceCores.add(srcCore);
|
||||
}
|
||||
} else {
|
||||
DirectoryFactory dirFactory = core.getDirectoryFactory();
|
||||
for (int i = 0; i < dirNames.length; i++) {
|
||||
boolean markAsDone = false;
|
||||
if (dirFactory instanceof CachingDirectoryFactory) {
|
||||
if (!((CachingDirectoryFactory) dirFactory).getLivePaths().contains(dirNames[i])) {
|
||||
markAsDone = true;
|
||||
}
|
||||
}
|
||||
Directory dir = dirFactory.get(dirNames[i], DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
|
||||
dirsToBeReleased.put(dir, markAsDone);
|
||||
// TODO: why doesn't this use the IR factory? what is going on here?
|
||||
readersToBeClosed.add(DirectoryReader.open(dir));
|
||||
}
|
||||
}
|
||||
|
||||
List<DirectoryReader> readers = null;
|
||||
if (readersToBeClosed.size() > 0) {
|
||||
readers = readersToBeClosed;
|
||||
} else {
|
||||
readers = Lists.newArrayList();
|
||||
for (SolrCore solrCore : sourceCores) {
|
||||
// record the searchers so that we can decref
|
||||
RefCounted<SolrIndexSearcher> searcher = solrCore.getSearcher();
|
||||
searchers.add(searcher);
|
||||
readers.add(searcher.get().getIndexReader());
|
||||
}
|
||||
}
|
||||
|
||||
UpdateRequestProcessorChain processorChain =
|
||||
core.getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
|
||||
wrappedReq = new LocalSolrQueryRequest(core, it.req.getParams());
|
||||
UpdateRequestProcessor processor =
|
||||
processorChain.createProcessor(wrappedReq, it.rsp);
|
||||
processor.processMergeIndexes(new MergeIndexesCommand(readers, it.req));
|
||||
} catch (Exception e) {
|
||||
// log and rethrow so that if the finally fails we don't lose the original problem
|
||||
log.error("ERROR executing merge:", e);
|
||||
throw e;
|
||||
} finally {
|
||||
for (RefCounted<SolrIndexSearcher> searcher : searchers) {
|
||||
if (searcher != null) searcher.decref();
|
||||
}
|
||||
for (SolrCore solrCore : sourceCores) {
|
||||
if (solrCore != null) solrCore.close();
|
||||
}
|
||||
IOUtils.closeWhileHandlingException(readersToBeClosed);
|
||||
Set<Map.Entry<Directory, Boolean>> entries = dirsToBeReleased.entrySet();
|
||||
for (Map.Entry<Directory, Boolean> entry : entries) {
|
||||
DirectoryFactory dirFactory = core.getDirectoryFactory();
|
||||
Directory dir = entry.getKey();
|
||||
boolean markAsDone = entry.getValue();
|
||||
if (markAsDone) {
|
||||
dirFactory.doneWithDirectory(dir);
|
||||
}
|
||||
dirFactory.release(dir);
|
||||
}
|
||||
if (wrappedReq != null) wrappedReq.close();
|
||||
core.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.CommitUpdateCommand;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public void execute(CallInfo it) throws Exception {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
cname = "";
|
||||
}
|
||||
|
||||
String nodeName = params.get("nodeName");
|
||||
String coreNodeName = params.get("coreNodeName");
|
||||
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
|
||||
Boolean checkLive = params.getBool("checkLive");
|
||||
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
|
||||
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
|
||||
|
||||
log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
|
||||
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
|
||||
+ ", onlyIfLeaderActive: " + onlyIfLeaderActive);
|
||||
|
||||
int maxTries = 0;
|
||||
Replica.State state = null;
|
||||
boolean live = false;
|
||||
int retry = 0;
|
||||
while (true) {
|
||||
CoreContainer coreContainer = it.handler.coreContainer;
|
||||
try (SolrCore core = coreContainer.getCore(cname)) {
|
||||
if (core == null && retry == 30) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:"
|
||||
+ cname);
|
||||
}
|
||||
if (core != null) {
|
||||
if (onlyIfLeader != null && onlyIfLeader) {
|
||||
if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "We are not the leader");
|
||||
}
|
||||
}
|
||||
|
||||
// wait until we are sure the recovering node is ready
|
||||
// to accept updates
|
||||
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
|
||||
.getCloudDescriptor();
|
||||
String collectionName = cloudDescriptor.getCollectionName();
|
||||
|
||||
if (retry % 15 == 0) {
|
||||
if (retry > 0 && log.isInfoEnabled())
|
||||
log.info("After " + retry + " seconds, core " + cname + " (" +
|
||||
cloudDescriptor.getShardId() + " of " +
|
||||
cloudDescriptor.getCollectionName() + ") still does not have state: " +
|
||||
waitForState + "; forcing ClusterState update from ZooKeeper");
|
||||
|
||||
// force a cluster state update
|
||||
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
|
||||
}
|
||||
|
||||
if (maxTries == 0) {
|
||||
// wait long enough for the leader conflict to work itself out plus a little extra
|
||||
int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
|
||||
maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
|
||||
log.info("Will wait a max of " + maxTries + " seconds to see " + cname + " (" +
|
||||
cloudDescriptor.getShardId() + " of " +
|
||||
cloudDescriptor.getCollectionName() + ") have state: " + waitForState);
|
||||
}
|
||||
|
||||
ClusterState clusterState = coreContainer.getZkController().getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
Slice slice = collection.getSlice(cloudDescriptor.getShardId());
|
||||
if (slice != null) {
|
||||
final Replica replica = slice.getReplicasMap().get(coreNodeName);
|
||||
if (replica != null) {
|
||||
state = replica.getState();
|
||||
live = clusterState.liveNodesContain(nodeName);
|
||||
|
||||
final Replica.State localState = cloudDescriptor.getLastPublished();
|
||||
|
||||
// TODO: This is funky but I've seen this in testing where the replica asks the
|
||||
// leader to be in recovery? Need to track down how that happens ... in the meantime,
|
||||
// this is a safeguard
|
||||
boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
|
||||
onlyIfLeader &&
|
||||
core.getName().equals(replica.getStr("core")) &&
|
||||
waitForState == Replica.State.RECOVERING &&
|
||||
localState == Replica.State.ACTIVE &&
|
||||
state == Replica.State.ACTIVE);
|
||||
|
||||
if (leaderDoesNotNeedRecovery) {
|
||||
log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
|
||||
}
|
||||
|
||||
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
|
||||
log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
|
||||
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
|
||||
", isLeader? " + core.getCoreDescriptor().getCloudDescriptor().isLeader() +
|
||||
", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state.toString() + ", localState=" + localState + ", nodeName=" + nodeName +
|
||||
", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult + ", nodeProps: " + replica);
|
||||
|
||||
if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
|
||||
if (checkLive == null) {
|
||||
break;
|
||||
} else if (checkLive && live) {
|
||||
break;
|
||||
} else if (!checkLive && !live) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (retry++ == maxTries) {
|
||||
String collection = null;
|
||||
String leaderInfo = null;
|
||||
String shardId = null;
|
||||
try {
|
||||
CloudDescriptor cloudDescriptor =
|
||||
core.getCoreDescriptor().getCloudDescriptor();
|
||||
collection = cloudDescriptor.getCollectionName();
|
||||
shardId = cloudDescriptor.getShardId();
|
||||
leaderInfo = coreContainer.getZkController().
|
||||
getZkStateReader().getLeaderUrl(collection, shardId, 5000);
|
||||
} catch (Exception exc) {
|
||||
leaderInfo = "Not available due to: " + exc;
|
||||
}
|
||||
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"I was asked to wait on state " + waitForState + " for "
|
||||
+ shardId + " in " + collection + " on " + nodeName
|
||||
+ " but I still do not see the requested state. I see state: "
|
||||
+ state.toString() + " live:" + live + " leader from ZK: " + leaderInfo
|
||||
);
|
||||
}
|
||||
|
||||
if (coreContainer.isShutDown()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Solr is shutting down");
|
||||
}
|
||||
|
||||
// solrcloud_debug
|
||||
if (log.isDebugEnabled()) {
|
||||
try {
|
||||
LocalSolrQueryRequest r = new LocalSolrQueryRequest(core,
|
||||
new ModifiableSolrParams());
|
||||
CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
|
||||
commitCmd.softCommit = true;
|
||||
core.getUpdateHandler().commit(commitCmd);
|
||||
RefCounted<SolrIndexSearcher> searchHolder = core
|
||||
.getNewestSearcher(false);
|
||||
SolrIndexSearcher searcher = searchHolder.get();
|
||||
try {
|
||||
log.debug(core.getCoreDescriptor().getCoreContainer()
|
||||
.getZkController().getNodeName()
|
||||
+ " to replicate "
|
||||
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits
|
||||
+ " gen:"
|
||||
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
|
||||
+ " data:" + core.getDataDir());
|
||||
} finally {
|
||||
searchHolder.decref();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Error in solrcloud_debug block", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
log.info("Waited coreNodeName: " + coreNodeName + ", state: " + waitForState
|
||||
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader + " for: " + retry + " seconds.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
|
||||
class RequestApplyUpdatesOp implements CoreAdminHandler.CoreAdminOp {
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.NAME, "");
|
||||
CoreAdminOperation.log().info("Applying buffered updates on core: " + cname);
|
||||
CoreContainer coreContainer = it.handler.coreContainer;
|
||||
try (SolrCore core = coreContainer.getCore(cname)) {
|
||||
if (core == null)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core [" + cname + "] not found");
|
||||
UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
|
||||
if (updateLog.getState() != UpdateLog.State.BUFFERING) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Core " + cname + " not in buffering state");
|
||||
}
|
||||
Future<UpdateLog.RecoveryInfo> future = updateLog.applyBufferedUpdates();
|
||||
if (future == null) {
|
||||
CoreAdminOperation.log().info("No buffered updates available. core=" + cname);
|
||||
it.rsp.add("core", cname);
|
||||
it.rsp.add("status", "EMPTY_BUFFER");
|
||||
return;
|
||||
}
|
||||
UpdateLog.RecoveryInfo report = future.get();
|
||||
if (report.failed) {
|
||||
SolrException.log(CoreAdminOperation.log(), "Replay failed");
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
|
||||
}
|
||||
coreContainer.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
it.rsp.add("core", cname);
|
||||
it.rsp.add("status", "BUFFER_APPLIED");
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
CoreAdminOperation.log().warn("Recovery was interrupted", e);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof SolrException)
|
||||
throw (SolrException) e;
|
||||
else
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not apply buffered updates", e);
|
||||
} finally {
|
||||
if (it.req != null) it.req.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.solr.cloud.SyncStrategy;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class RequestSyncShardOp implements CoreAdminHandler.CoreAdminOp {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public void execute(CallInfo it) throws Exception {
|
||||
final SolrParams params = it.req.getParams();
|
||||
|
||||
log.info("I have been requested to sync up my shard");
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
|
||||
}
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
SyncStrategy syncStrategy = null;
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
|
||||
if (core != null) {
|
||||
syncStrategy = new SyncStrategy(core.getCoreDescriptor().getCoreContainer());
|
||||
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, cname);
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
|
||||
|
||||
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true);
|
||||
// solrcloud_debug
|
||||
if (log.isDebugEnabled()) {
|
||||
try {
|
||||
RefCounted<SolrIndexSearcher> searchHolder = core
|
||||
.getNewestSearcher(false);
|
||||
SolrIndexSearcher searcher = searchHolder.get();
|
||||
try {
|
||||
log.debug(core.getCoreDescriptor().getCoreContainer()
|
||||
.getZkController().getNodeName()
|
||||
+ " synched "
|
||||
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
|
||||
} finally {
|
||||
searchHolder.decref();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Error in solrcloud_debug block", e);
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Sync Failed");
|
||||
}
|
||||
} else {
|
||||
SolrException.log(log, "Could not find core to call sync:" + cname);
|
||||
}
|
||||
} finally {
|
||||
// no recoveryStrat close for now
|
||||
if (syncStrategy != null) {
|
||||
syncStrategy.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.handler.RestoreCore;
|
||||
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
|
||||
class RestoreCoreOp implements CoreAdminHandler.CoreAdminOp {
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
ZkController zkController = it.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
|
||||
}
|
||||
|
||||
final SolrParams params = it.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
|
||||
BackupRepository repository = it.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
|
||||
|
||||
String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
|
||||
if (location == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
|
||||
+ " parameter or as a default repository property");
|
||||
}
|
||||
|
||||
URI locationUri = repository.createURI(location);
|
||||
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
|
||||
RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name);
|
||||
boolean success = restoreCore.doRestore();
|
||||
if (!success) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.update.SplitIndexCommand;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.params.CommonParams.PATH;
|
||||
|
||||
|
||||
class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
SolrParams params = it.req.getParams();
|
||||
List<DocRouter.Range> ranges = null;
|
||||
|
||||
String[] pathsArr = params.getParams(PATH);
|
||||
String rangesStr = params.get(CoreAdminParams.RANGES); // ranges=a-b,c-d,e-f
|
||||
if (rangesStr != null) {
|
||||
String[] rangesArr = rangesStr.split(",");
|
||||
if (rangesArr.length == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least one range specified to split an index");
|
||||
} else {
|
||||
ranges = new ArrayList<>(rangesArr.length);
|
||||
for (String r : rangesArr) {
|
||||
try {
|
||||
ranges.add(DocRouter.DEFAULT.fromString(r));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception parsing hexadecimal hash range: " + r, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
String splitKey = params.get("split.key");
|
||||
String[] newCoreNames = params.getParams("targetCore");
|
||||
String cname = params.get(CoreAdminParams.CORE, "");
|
||||
|
||||
if ((pathsArr == null || pathsArr.length == 0) && (newCoreNames == null || newCoreNames.length == 0)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Either path or targetCore param must be specified");
|
||||
}
|
||||
|
||||
log.info("Invoked split action for core: " + cname);
|
||||
SolrCore core = it.handler.coreContainer.getCore(cname);
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
|
||||
List<SolrCore> newCores = null;
|
||||
|
||||
try {
|
||||
// TODO: allow use of rangesStr in the future
|
||||
List<String> paths = null;
|
||||
int partitions = pathsArr != null ? pathsArr.length : newCoreNames.length;
|
||||
|
||||
DocRouter router = null;
|
||||
String routeFieldName = null;
|
||||
if (it.handler.coreContainer.isZooKeeperAware()) {
|
||||
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
|
||||
String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
Slice slice = collection.getSlice(sliceName);
|
||||
router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
if (ranges == null) {
|
||||
DocRouter.Range currentRange = slice.getRange();
|
||||
ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
|
||||
}
|
||||
Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
|
||||
if (routerObj != null && routerObj instanceof Map) {
|
||||
Map routerProps = (Map) routerObj;
|
||||
routeFieldName = (String) routerProps.get("field");
|
||||
}
|
||||
}
|
||||
|
||||
if (pathsArr == null) {
|
||||
newCores = new ArrayList<>(partitions);
|
||||
for (String newCoreName : newCoreNames) {
|
||||
SolrCore newcore = it.handler.coreContainer.getCore(newCoreName);
|
||||
if (newcore != null) {
|
||||
newCores.add(newcore);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
paths = Arrays.asList(pathsArr);
|
||||
}
|
||||
|
||||
|
||||
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
|
||||
core.getUpdateHandler().split(cmd);
|
||||
|
||||
// After the split has completed, someone (here?) should start the process of replaying the buffered updates.
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("ERROR executing split:", e);
|
||||
throw new RuntimeException(e);
|
||||
|
||||
} finally {
|
||||
if (req != null) req.close();
|
||||
if (core != null) core.close();
|
||||
if (newCores != null) {
|
||||
for (SolrCore newCore : newCores) {
|
||||
newCore.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
|
||||
|
||||
class StatusOp implements CoreAdminHandler.CoreAdminOp {
|
||||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
SolrParams params = it.req.getParams();
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
String indexInfo = params.get(CoreAdminParams.INDEX_INFO);
|
||||
boolean isIndexInfoNeeded = Boolean.parseBoolean(null == indexInfo ? "true" : indexInfo);
|
||||
NamedList<Object> status = new SimpleOrderedMap<>();
|
||||
Map<String, Exception> failures = new HashMap<>();
|
||||
for (Map.Entry<String, CoreContainer.CoreLoadFailure> failure : it.handler.coreContainer.getCoreInitFailures().entrySet()) {
|
||||
failures.put(failure.getKey(), failure.getValue().exception);
|
||||
}
|
||||
try {
|
||||
if (cname == null) {
|
||||
for (String name : it.handler.coreContainer.getAllCoreNames()) {
|
||||
status.add(name, CoreAdminOperation.getCoreStatus(it.handler.coreContainer, name, isIndexInfoNeeded));
|
||||
}
|
||||
it.rsp.add("initFailures", failures);
|
||||
} else {
|
||||
failures = failures.containsKey(cname)
|
||||
? Collections.singletonMap(cname, failures.get(cname))
|
||||
: Collections.<String, Exception>emptyMap();
|
||||
it.rsp.add("initFailures", failures);
|
||||
status.add(cname, CoreAdminOperation.getCoreStatus(it.handler.coreContainer, cname, isIndexInfoNeeded));
|
||||
}
|
||||
it.rsp.add("status", status);
|
||||
} catch (Exception ex) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Error handling 'status' action ", ex);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue