mirror of https://github.com/apache/lucene.git
SOLR-5750: Add /admin/collections?action=BACKUP and RESTORE
This commit is contained in:
parent
a48245a1bf
commit
70bcd562f9
|
@ -119,6 +119,9 @@ New Features
|
|||
|
||||
* SOLR-9027: Add GraphTermsQuery to limit traversal on high frequency nodes (Joel Bernstein, David Smiley)
|
||||
|
||||
* SOLR-5750: Add /admin/collections?action=BACKUP and RESTORE assuming access to a shared file system.
|
||||
(Varun Thacker, David Smiley)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -17,7 +17,14 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
import java.io.Writer;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -29,6 +36,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -95,9 +103,9 @@ import static org.apache.solr.common.cloud.DocCollection.SNITCH;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
|
@ -273,6 +281,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
case MIGRATESTATEFORMAT:
|
||||
migrateStateFormat(message, results);
|
||||
break;
|
||||
case BACKUP:
|
||||
processBackupAction(message, results);
|
||||
break;
|
||||
case RESTORE:
|
||||
processRestoreAction(message, results);
|
||||
break;
|
||||
default:
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
|
||||
+ operation);
|
||||
|
@ -297,6 +311,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
return new OverseerSolrResponse(results);
|
||||
}
|
||||
|
||||
//
|
||||
// TODO DWS: this class has gone out of control (too big); refactor to break it up
|
||||
//
|
||||
|
||||
private void reloadCollection(ZkNodeProps message, NamedList results) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
|
||||
|
@ -1758,6 +1776,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
private void addPropertyParams(ZkNodeProps message, Map<String,Object> map) {
|
||||
// Now add the property.key=value pairs
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
map.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
|
||||
// TODO: add smarter options that look at the current number of cores per
|
||||
// node?
|
||||
|
@ -2149,6 +2176,248 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
waitForCoreNodeName(collection, node, coreName);
|
||||
}
|
||||
|
||||
private void processBackupAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException {
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String backupName = message.getStr(NAME);
|
||||
String location = message.getStr("location");
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
Instant startTime = Instant.now();
|
||||
|
||||
// note: we assume a shared files system to backup a collection, since a collection is distributed
|
||||
Path backupPath = Paths.get(location).resolve(backupName).toAbsolutePath();
|
||||
|
||||
//Validating if the directory already exists.
|
||||
if (Files.exists(backupPath)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Backup directory already exists: " + backupPath);
|
||||
}
|
||||
Files.createDirectory(backupPath); // create now
|
||||
|
||||
log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
|
||||
backupPath);
|
||||
|
||||
for (Slice slice : zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
|
||||
Replica replica = slice.getLeader();
|
||||
|
||||
String coreName = replica.getStr(CORE_NAME_PROP);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.BACKUPCORE.toString());
|
||||
params.set(NAME, slice.getName());
|
||||
params.set("location", backupPath.toString()); // note: index dir will be here then the "snapshot." + slice name
|
||||
params.set(CORE_NAME_PROP, coreName);
|
||||
|
||||
sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
|
||||
}
|
||||
log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
|
||||
|
||||
processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
|
||||
|
||||
log.info("Starting to backup ZK data for backupName={}", backupName);
|
||||
|
||||
//Download the configs
|
||||
String configName = zkStateReader.readConfigName(collectionName);
|
||||
Path zkBackup = backupPath.resolve("zk_backup");
|
||||
zkStateReader.getConfigManager().downloadConfigDir(configName, zkBackup.resolve("configs").resolve(configName));
|
||||
|
||||
//Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
|
||||
//Since we don't want to distinguish we extract the state and back it up as a separate json
|
||||
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
Files.write(zkBackup.resolve("collection_state.json"),
|
||||
Utils.toJSON(Collections.singletonMap(collectionName, collection)));
|
||||
|
||||
Path propertiesPath = backupPath.resolve("backup.properties");
|
||||
Properties properties = new Properties();
|
||||
|
||||
properties.put("backupName", backupName);
|
||||
properties.put("collection", collectionName);
|
||||
properties.put("collection.configName", configName);
|
||||
properties.put("startTime", startTime.toString());
|
||||
//TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
|
||||
//if they are not the same then we can throw an error or have an 'overwriteConfig' flag
|
||||
//TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
|
||||
|
||||
try (Writer os = Files.newBufferedWriter(propertiesPath, StandardCharsets.UTF_8)) {
|
||||
properties.store(os, "Snapshot properties file");
|
||||
}
|
||||
|
||||
log.info("Completed backing up ZK data for backupName={}", backupName);
|
||||
}
|
||||
|
||||
private void processRestoreAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException {
|
||||
// TODO maybe we can inherit createCollection's options/code
|
||||
String restoreCollectionName = message.getStr(COLLECTION_PROP);
|
||||
String backupName = message.getStr(NAME); // of backup
|
||||
String location = message.getStr("location");
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = new HashMap<>();
|
||||
|
||||
Path backupPath = Paths.get(location).resolve(backupName).toAbsolutePath();
|
||||
if (!Files.exists(backupPath)) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Couldn't restore since doesn't exist: " + backupPath);
|
||||
}
|
||||
Path backupZkPath = backupPath.resolve("zk_backup");
|
||||
|
||||
Properties properties = new Properties();
|
||||
try (Reader in = Files.newBufferedReader(backupPath.resolve("backup.properties"), StandardCharsets.UTF_8)) {
|
||||
properties.load(in);
|
||||
}
|
||||
|
||||
String backupCollection = (String) properties.get("collection");
|
||||
byte[] data = Files.readAllBytes(backupZkPath.resolve("collection_state.json"));
|
||||
ClusterState backupClusterState = ClusterState.load(-1, data, Collections.emptySet());
|
||||
DocCollection backupCollectionState = backupClusterState.getCollection(backupCollection);
|
||||
|
||||
//Upload the configs
|
||||
String configName = (String) properties.get(COLL_CONF);
|
||||
String restoreConfigName = message.getStr(COLL_CONF, configName);
|
||||
if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
|
||||
log.info("Using existing config {}", restoreConfigName);
|
||||
//TODO add overwrite option?
|
||||
} else {
|
||||
log.info("Uploading config {}", restoreConfigName);
|
||||
zkStateReader.getConfigManager().uploadConfigDir(backupZkPath.resolve("configs").resolve(configName), restoreConfigName);
|
||||
}
|
||||
|
||||
log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
|
||||
backupPath);
|
||||
|
||||
//Create core-less collection
|
||||
{
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
|
||||
propMap.put("fromApi", "true"); // mostly true. Prevents autoCreated=true in the collection state.
|
||||
|
||||
// inherit settings from input API, defaulting to the backup's setting. Ex: replicationFactor
|
||||
for (String collProp : COLL_PROPS.keySet()) {
|
||||
Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
|
||||
if (val != null) {
|
||||
propMap.put(collProp, val);
|
||||
}
|
||||
}
|
||||
|
||||
propMap.put(NAME, restoreCollectionName);
|
||||
propMap.put(CREATE_NODE_SET, CREATE_NODE_SET_EMPTY); //no cores
|
||||
propMap.put(COLL_CONF, restoreConfigName);
|
||||
|
||||
// router.*
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
|
||||
for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
|
||||
propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
|
||||
}
|
||||
|
||||
Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
|
||||
if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
|
||||
propMap.put(SHARDS_PROP, StrUtils.join(sliceNames, ','));
|
||||
} else {
|
||||
propMap.put(NUM_SLICES, sliceNames.size());
|
||||
// ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
|
||||
// list of names, and if so uses this instead of building it. We clear the replica list.
|
||||
Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
|
||||
Map<String,Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
|
||||
for (Slice backupSlice : backupSlices) {
|
||||
newSlices.put(backupSlice.getName(),
|
||||
new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
|
||||
}
|
||||
propMap.put(SHARDS_PROP, newSlices);
|
||||
}
|
||||
|
||||
createCollection(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
|
||||
// note: when createCollection() returns, the collection exists (no race)
|
||||
}
|
||||
|
||||
DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||
|
||||
//Mark all shards in CONSTRUCTION STATE while we restore the data
|
||||
{
|
||||
//TODO might instead createCollection accept an initial state? Is there a race?
|
||||
Map<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
for (Slice shard : restoreCollection.getSlices()) {
|
||||
propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
|
||||
}
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
|
||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
}
|
||||
|
||||
// TODO how do we leverage the CREATE_NODE_SET / RULE / SNITCH logic in createCollection?
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
//Create one replica per shard and copy backed up data to it
|
||||
for (Slice slice: restoreCollection.getSlices()) {
|
||||
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
|
||||
propMap.put(COLLECTION_PROP, restoreCollectionName);
|
||||
propMap.put(SHARD_ID_PROP, slice.getName());
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addPropertyParams(message, propMap);
|
||||
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), new NamedList());
|
||||
}
|
||||
|
||||
//refresh the location copy of collection state
|
||||
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
//Copy data from backed up index to each replica
|
||||
for (Slice slice: restoreCollection.getSlices()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.RESTORECORE.toString());
|
||||
params.set(NAME, "snapshot." + slice.getName());
|
||||
params.set("location", backupPath.toString());
|
||||
sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
|
||||
}
|
||||
processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
|
||||
|
||||
//Mark all shards in ACTIVE STATE
|
||||
{
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
|
||||
for (Slice shard : restoreCollection.getSlices()) {
|
||||
propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
|
||||
}
|
||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||
}
|
||||
|
||||
//refresh the location copy of collection state
|
||||
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
|
||||
|
||||
//Add the remaining replicas for each shard
|
||||
Integer numReplicas = restoreCollection.getReplicationFactor();
|
||||
if (numReplicas != null && numReplicas > 1) {
|
||||
log.info("Adding replicas to restored collection={}", restoreCollection);
|
||||
|
||||
for (Slice slice: restoreCollection.getSlices()) {
|
||||
for(int i=1; i<numReplicas; i++) {
|
||||
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(COLLECTION_PROP, restoreCollectionName);
|
||||
propMap.put(SHARD_ID_PROP, slice.getName());
|
||||
// add async param
|
||||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addPropertyParams(message, propMap);
|
||||
|
||||
addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
|
||||
}
|
||||
|
||||
private void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
|
||||
String asyncId, Map<String, String> requestMap) {
|
||||
processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.solr.cloud.overseer;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -57,33 +56,37 @@ public class ClusterStateMutator {
|
|||
return ZkStateWriter.NO_OP;
|
||||
}
|
||||
|
||||
ArrayList<String> shards = new ArrayList<>();
|
||||
|
||||
if (ImplicitDocRouter.NAME.equals(message.getStr("router.name", DocRouter.DEFAULT_NAME))) {
|
||||
getShardNames(shards, message.getStr("shards", DocRouter.DEFAULT_NAME));
|
||||
} else {
|
||||
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
|
||||
if (numShards < 1)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "numShards is a required parameter for 'compositeId' router");
|
||||
getShardNames(numShards, shards);
|
||||
}
|
||||
|
||||
Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
|
||||
String routerName = routerSpec.get(NAME) == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get(NAME);
|
||||
DocRouter router = DocRouter.getDocRouter(routerName);
|
||||
|
||||
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
|
||||
Object messageShardsObj = message.get("shards");
|
||||
|
||||
Map<String, Slice> slices;
|
||||
if (messageShardsObj instanceof Map) { // we are being explicitly told the slice data (e.g. coll restore)
|
||||
slices = Slice.loadAllFromMap((Map<String, Object>)messageShardsObj);
|
||||
} else {
|
||||
List<String> shardNames = new ArrayList<>();
|
||||
|
||||
Map<String, Slice> newSlices = new LinkedHashMap<>();
|
||||
if (router instanceof ImplicitDocRouter) {
|
||||
getShardNames(shardNames, message.getStr("shards", DocRouter.DEFAULT_NAME));
|
||||
} else {
|
||||
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
|
||||
if (numShards < 1)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "numShards is a required parameter for 'compositeId' router");
|
||||
getShardNames(numShards, shardNames);
|
||||
}
|
||||
List<DocRouter.Range> ranges = router.partitionRange(shardNames.size(), router.fullRange());//maybe null
|
||||
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
String sliceName = shards.get(i);
|
||||
slices = new LinkedHashMap<>();
|
||||
for (int i = 0; i < shardNames.size(); i++) {
|
||||
String sliceName = shardNames.get(i);
|
||||
|
||||
Map<String, Object> sliceProps = new LinkedHashMap<>(1);
|
||||
sliceProps.put(Slice.RANGE, ranges == null ? null : ranges.get(i));
|
||||
|
||||
newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
|
||||
slices.put(sliceName, new Slice(sliceName, null, sliceProps));
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> collectionProps = new HashMap<>();
|
||||
|
@ -101,11 +104,12 @@ public class ClusterStateMutator {
|
|||
collectionProps.put("autoCreated", "true");
|
||||
}
|
||||
|
||||
//TODO default to 2; but need to debug why BasicDistributedZk2Test fails early on
|
||||
String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
|
||||
: ZkStateReader.getCollectionPath(cName);
|
||||
|
||||
DocCollection newCollection = new DocCollection(cName,
|
||||
newSlices, collectionProps, router, -1, znode);
|
||||
slices, collectionProps, router, -1, znode);
|
||||
|
||||
return new ZkWriteCommand(cName, newCollection);
|
||||
}
|
||||
|
|
|
@ -101,26 +101,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import static org.apache.solr.common.params.CommonParams.JAVABIN;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
|
||||
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
|
||||
import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
|
||||
import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
|
||||
import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
|
||||
import static org.apache.solr.handler.ReplicationHandler.FILE;
|
||||
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
|
||||
import static org.apache.solr.handler.ReplicationHandler.GENERATION;
|
||||
import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
|
||||
import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
|
||||
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
|
||||
import static org.apache.solr.handler.ReplicationHandler.SIZE;
|
||||
import static org.apache.solr.handler.ReplicationHandler.TLOG_FILE;
|
||||
import static org.apache.solr.handler.ReplicationHandler.TLOG_FILES;
|
||||
import static org.apache.solr.handler.ReplicationHandler.*;
|
||||
|
||||
/**
|
||||
* <p> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
|
||||
|
@ -890,7 +871,7 @@ public class IndexFetcher {
|
|||
return bytesDownloaded;
|
||||
}
|
||||
|
||||
private boolean filesToAlwaysDownloadIfNoChecksums(String filename,
|
||||
static boolean filesToAlwaysDownloadIfNoChecksums(String filename,
|
||||
long size, CompareResult compareResult) {
|
||||
// without checksums to compare, we always download .si, .liv, segments_N,
|
||||
// and any very small files
|
||||
|
|
|
@ -87,7 +87,6 @@ import org.apache.solr.response.SolrQueryResponse;
|
|||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.CdcrUpdateLog;
|
||||
import org.apache.solr.update.SolrIndexWriter;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.VersionInfo;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.NumberUtils;
|
||||
|
@ -195,7 +194,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
volatile IndexCommit indexCommitPoint;
|
||||
|
||||
volatile NamedList<Object> snapShootDetails;
|
||||
volatile NamedList<?> snapShootDetails;
|
||||
|
||||
private AtomicBoolean replicationEnabled = new AtomicBoolean(true);
|
||||
|
||||
|
@ -509,7 +508,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
// small race here before the commit point is saved
|
||||
SnapShooter snapShooter = new SnapShooter(core, params.get("location"), params.get(NAME));
|
||||
snapShooter.validateCreateSnapshot();
|
||||
snapShooter.createSnapAsync(indexCommit, numberToKeep, this);
|
||||
snapShooter.createSnapAsync(indexCommit, numberToKeep, (nl) -> snapShootDetails = nl);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception during creating a snapshot", e);
|
||||
|
@ -1323,7 +1322,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
SnapShooter snapShooter = new SnapShooter(core, null, null);
|
||||
snapShooter.validateCreateSnapshot();
|
||||
snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, ReplicationHandler.this);
|
||||
snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, (nl) -> snapShootDetails = nl);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception while snapshooting", e);
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ public class RestoreCore implements Callable<Boolean> {
|
|||
return doRestore();
|
||||
}
|
||||
|
||||
private boolean doRestore() throws Exception {
|
||||
public boolean doRestore() throws Exception {
|
||||
|
||||
Path backupPath = Paths.get(backupLocation).resolve(backupName);
|
||||
SimpleDateFormat dateFormat = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT);
|
||||
|
@ -86,8 +86,8 @@ public class RestoreCore implements Callable<Boolean> {
|
|||
}
|
||||
long length = indexInput.length();
|
||||
IndexFetcher.CompareResult compareResult = IndexFetcher.compareFile(indexDir, filename, length, checksum);
|
||||
if (!compareResult.equal || (!compareResult.checkSummed && (filename.endsWith(".si")
|
||||
|| filename.endsWith(".liv") || filename.startsWith("segments_")))) {
|
||||
if (!compareResult.equal ||
|
||||
(IndexFetcher.filesToAlwaysDownloadIfNoChecksums(filename, length, compareResult))) {
|
||||
restoreIndexDir.copyFrom(backupDir, filename, filename, IOContext.READONCE);
|
||||
} else {
|
||||
//prefer local copy
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.solr.handler;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -26,6 +28,7 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
@ -36,7 +39,10 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.DirectoryFactory.DirContext;
|
||||
import org.apache.solr.core.IndexDeletionPolicyWrapper;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -54,6 +60,7 @@ public class SnapShooter {
|
|||
private String snapshotName = null;
|
||||
private String directoryName = null;
|
||||
private File snapShotDir = null;
|
||||
//TODO update to NIO Path API
|
||||
|
||||
public SnapShooter(SolrCore core, String location, String snapshotName) {
|
||||
solrCore = core;
|
||||
|
@ -73,20 +80,10 @@ public class SnapShooter {
|
|||
}
|
||||
}
|
||||
|
||||
void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {
|
||||
replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getGeneration());
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
if(snapshotName != null) {
|
||||
createSnapshot(indexCommit, replicationHandler);
|
||||
} else {
|
||||
createSnapshot(indexCommit, replicationHandler);
|
||||
deleteOldBackups(numberToKeep);
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
/** Gets the parent directory of the snapshots. This is the {@code location} given in the constructor after
|
||||
* being resolved against the core instance dir. */
|
||||
public Path getLocation() {
|
||||
return Paths.get(snapDir);
|
||||
}
|
||||
|
||||
public void validateDeleteSnapshot() {
|
||||
|
@ -112,25 +109,67 @@ public class SnapShooter {
|
|||
}.start();
|
||||
}
|
||||
|
||||
void validateCreateSnapshot() throws IOException {
|
||||
public void validateCreateSnapshot() throws IOException {
|
||||
snapShotDir = new File(snapDir, directoryName);
|
||||
if (snapShotDir.exists()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Snapshot directory already exists: " + snapShotDir.getAbsolutePath());
|
||||
}
|
||||
if (!snapShotDir.mkdirs()) {
|
||||
if (!snapShotDir.mkdirs()) { // note: TODO reconsider mkdirs vs mkdir
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
void createSnapshot(final IndexCommit indexCommit, ReplicationHandler replicationHandler) {
|
||||
LOG.info("Creating backup snapshot " + (snapshotName == null ? "<not named>" : snapshotName) + " at " + snapDir);
|
||||
NamedList<Object> details = new NamedList<>();
|
||||
details.add("startTime", new Date().toString());
|
||||
public NamedList createSnapshot() throws Exception {
|
||||
IndexDeletionPolicyWrapper deletionPolicy = solrCore.getDeletionPolicy();
|
||||
RefCounted<SolrIndexSearcher> searcher = solrCore.getSearcher();
|
||||
try {
|
||||
Collection<String> files = indexCommit.getFileNames();
|
||||
//TODO should we try solrCore.getDeletionPolicy().getLatestCommit() first?
|
||||
IndexCommit indexCommit = searcher.get().getIndexReader().getIndexCommit();
|
||||
deletionPolicy.saveCommitPoint(indexCommit.getGeneration());
|
||||
try {
|
||||
return createSnapshot(indexCommit);
|
||||
} finally {
|
||||
deletionPolicy.releaseCommitPoint(indexCommit.getGeneration());
|
||||
}
|
||||
} finally {
|
||||
searcher.decref();
|
||||
}
|
||||
}
|
||||
|
||||
public void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, Consumer<NamedList> result) {
|
||||
solrCore.getDeletionPolicy().saveCommitPoint(indexCommit.getGeneration());
|
||||
|
||||
new Thread() { //TODO should use Solr's ExecutorUtil
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
result.accept(createSnapshot(indexCommit));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception while creating snapshot", e);
|
||||
NamedList snapShootDetails = new NamedList<>();
|
||||
snapShootDetails.add("snapShootException", e.getMessage());
|
||||
result.accept(snapShootDetails);
|
||||
} finally {
|
||||
solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
|
||||
}
|
||||
if (snapshotName == null) {
|
||||
deleteOldBackups(numberToKeep);
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
|
||||
// note: remember to reserve the indexCommit first so it won't get deleted concurrently
|
||||
protected NamedList createSnapshot(final IndexCommit indexCommit) throws Exception {
|
||||
LOG.info("Creating backup snapshot " + (snapshotName == null ? "<not named>" : snapshotName) + " at " + snapDir);
|
||||
boolean success = false;
|
||||
try {
|
||||
NamedList<Object> details = new NamedList<>();
|
||||
details.add("startTime", new Date().toString());//bad; should be Instant.now().toString()
|
||||
|
||||
Collection<String> files = indexCommit.getFileNames();
|
||||
Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
|
||||
try {
|
||||
copyFiles(dir, files, snapShotDir);
|
||||
|
@ -140,17 +179,16 @@ public class SnapShooter {
|
|||
|
||||
details.add("fileCount", files.size());
|
||||
details.add("status", "success");
|
||||
details.add("snapshotCompletedAt", new Date().toString());
|
||||
details.add("snapshotCompletedAt", new Date().toString());//bad; should be Instant.now().toString()
|
||||
details.add("snapshotName", snapshotName);
|
||||
LOG.info("Done creating backup snapshot: " + (snapshotName == null ? "<not named>" : snapshotName) +
|
||||
" at " + snapDir);
|
||||
} catch (Exception e) {
|
||||
IndexFetcher.delTree(snapShotDir);
|
||||
LOG.error("Exception while creating snapshot", e);
|
||||
details.add("snapShootException", e.getMessage());
|
||||
success = true;
|
||||
return details;
|
||||
} finally {
|
||||
replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
|
||||
replicationHandler.snapShootDetails = details;
|
||||
if (!success) {
|
||||
IndexFetcher.delTree(snapShotDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -797,6 +797,57 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
throws Exception {
|
||||
return req.getParams().required().getAll(null, COLLECTION_PROP);
|
||||
}
|
||||
},
|
||||
BACKUP_OP(BACKUP) {
|
||||
@Override
|
||||
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
|
||||
req.getParams().required().check(NAME, COLLECTION_PROP);
|
||||
|
||||
String collectionName = req.getParams().get(COLLECTION_PROP);
|
||||
ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
|
||||
if (!clusterState.hasCollection(collectionName)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
|
||||
}
|
||||
|
||||
String location = req.getParams().get("location");
|
||||
if (location == null) {
|
||||
location = (String) h.coreContainer.getZkController().getZkStateReader().getClusterProps().get("location");
|
||||
}
|
||||
if (location == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");
|
||||
}
|
||||
Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP);
|
||||
params.put("location", location);
|
||||
return params;
|
||||
}
|
||||
},
|
||||
RESTORE_OP(RESTORE) {
|
||||
@Override
|
||||
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
|
||||
req.getParams().required().check(NAME, COLLECTION_PROP);
|
||||
|
||||
String collectionName = SolrIdentifierValidator.validateCollectionName(req.getParams().get(COLLECTION_PROP));
|
||||
ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
|
||||
//We always want to restore into an collection name which doesn't exist yet.
|
||||
if (clusterState.hasCollection(collectionName)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' exists, no action taken.");
|
||||
}
|
||||
|
||||
String location = req.getParams().get("location");
|
||||
if (location == null) {
|
||||
location = (String) h.coreContainer.getZkController().getZkStateReader().getClusterProps().get("location");
|
||||
}
|
||||
if (location == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query parameter or set as a cluster property");
|
||||
}
|
||||
|
||||
Map<String, Object> params = req.getParams().getAll(null, NAME, COLLECTION_PROP);
|
||||
params.put("location", location);
|
||||
// from CREATE_OP:
|
||||
req.getParams().getAll(params, COLL_CONF, REPLICATION_FACTOR, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS);
|
||||
copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
|
||||
return params;
|
||||
}
|
||||
};
|
||||
CollectionAction action;
|
||||
long timeOut;
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -39,7 +40,6 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.SyncStrategy;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.NonExistentCoreException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -60,6 +60,8 @@ import org.apache.solr.core.CoreDescriptor;
|
|||
import org.apache.solr.core.DirectoryFactory;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.handler.RestoreCore;
|
||||
import org.apache.solr.handler.SnapShooter;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
|
@ -80,24 +82,7 @@ import org.slf4j.LoggerFactory;
|
|||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.common.params.CommonParams.PATH;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.CREATE;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.FORCEPREPAREFORLEADERSHIP;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.INVOKE;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.MERGEINDEXES;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.OVERSEEROP;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.PREPRECOVERY;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REJOINLEADERELECTION;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.RELOAD;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.RENAME;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REQUESTRECOVERY;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REQUESTSTATUS;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.REQUESTSYNCSHARD;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.SPLIT;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.STATUS;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.SWAP;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.UNLOAD;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.*;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
|
||||
import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
|
||||
|
@ -853,6 +838,81 @@ enum CoreAdminOperation {
|
|||
}
|
||||
|
||||
}
|
||||
},
|
||||
BACKUPCORE_OP(BACKUPCORE) {
|
||||
@Override
|
||||
public void call(CallInfo callInfo) throws IOException {
|
||||
ZkController zkController = callInfo.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Internal SolrCloud API");
|
||||
}
|
||||
|
||||
final SolrParams params = callInfo.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String location = params.get("location");
|
||||
if (location == null) {
|
||||
throw new IllegalArgumentException("location is required");
|
||||
}
|
||||
|
||||
try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) {
|
||||
SnapShooter snapShooter = new SnapShooter(core, location, name);
|
||||
// validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
|
||||
// But we want to throw. One reason is that
|
||||
// this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
|
||||
// file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
|
||||
if (!Files.exists(snapShooter.getLocation())) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Directory to contain snapshots doesn't exist: " + snapShooter.getLocation().toAbsolutePath());
|
||||
}
|
||||
snapShooter.validateCreateSnapshot();
|
||||
snapShooter.createSnapshot();
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed to backup core=" + cname + " because " + e, e);
|
||||
}
|
||||
}
|
||||
},
|
||||
RESTORECORE_OP(RESTORECORE) {
|
||||
@Override
|
||||
public void call(CallInfo callInfo) throws Exception {
|
||||
ZkController zkController = callInfo.handler.coreContainer.getZkController();
|
||||
if (zkController == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
|
||||
}
|
||||
|
||||
final SolrParams params = callInfo.req.getParams();
|
||||
String cname = params.get(CoreAdminParams.CORE);
|
||||
if (cname == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
|
||||
String name = params.get(NAME);
|
||||
if (name == null) {
|
||||
throw new IllegalArgumentException(CoreAdminParams.NAME + " is required");
|
||||
}
|
||||
|
||||
String location = params.get("location");
|
||||
if (location == null) {
|
||||
throw new IllegalArgumentException("location is required");
|
||||
}
|
||||
|
||||
try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) {
|
||||
RestoreCore restoreCore = new RestoreCore(core, location, name);
|
||||
boolean success = restoreCore.doRestore();
|
||||
if (!success) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.ShardParams._ROUTE_;
|
||||
|
||||
public class TestCloudBackupRestore extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
|
||||
|
||||
private static long docsSeed; // see indexDocs()
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws Exception {
|
||||
configureCluster(2)// nodes
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
|
||||
.configure();
|
||||
|
||||
docsSeed = random().nextLong();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
String collectionName = "backuprestore";
|
||||
boolean isImplicit = random().nextBoolean();
|
||||
int replFactor = TestUtil.nextInt(random(), 1, 2);
|
||||
CollectionAdminRequest.Create create =
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf1", NUM_SHARDS, replFactor);
|
||||
if (NUM_SHARDS * replFactor > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
|
||||
create.setMaxShardsPerNode(NUM_SHARDS);//just to assert it survives the restoration
|
||||
}
|
||||
if (random().nextBoolean()) {
|
||||
create.setAutoAddReplicas(true);//just to assert it survives the restoration
|
||||
}
|
||||
Properties coreProps = new Properties();
|
||||
coreProps.put("customKey", "customValue");//just to assert it survives the restoration
|
||||
create.setProperties(coreProps);
|
||||
if (isImplicit) { //implicit router
|
||||
create.setRouterName(ImplicitDocRouter.NAME);
|
||||
create.setNumShards(null);//erase it. TODO suggest a new createCollectionWithImplicitRouter method
|
||||
create.setShards("shard1,shard2"); // however still same number as NUM_SHARDS; we assume this later
|
||||
create.setRouterField("shard_s");
|
||||
} else {//composite id router
|
||||
if (random().nextBoolean()) {
|
||||
create.setRouterField("shard_s");
|
||||
}
|
||||
}
|
||||
|
||||
create.process(cluster.getSolrClient());
|
||||
indexDocs(collectionName);
|
||||
|
||||
if (!isImplicit && random().nextBoolean()) {
|
||||
// shard split the first shard
|
||||
int prevActiveSliceCount = getActiveSliceCount(collectionName);
|
||||
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
|
||||
splitShard.setShardName("shard1");
|
||||
splitShard.process(cluster.getSolrClient());
|
||||
// wait until we see one more active slice...
|
||||
for (int i = 0; getActiveSliceCount(collectionName) != prevActiveSliceCount + 1; i++) {
|
||||
assertTrue(i < 30);
|
||||
Thread.sleep(500);
|
||||
}
|
||||
// issue a hard commit. Split shard does a soft commit which isn't good enough for the backup/snapshooter to see
|
||||
cluster.getSolrClient().commit();
|
||||
}
|
||||
|
||||
testBackupAndRestore(collectionName);
|
||||
}
|
||||
|
||||
private int getActiveSliceCount(String collectionName) {
|
||||
return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size();
|
||||
}
|
||||
|
||||
private void indexDocs(String collectionName) throws Exception {
|
||||
Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
|
||||
int numDocs = random.nextInt(100);
|
||||
if (numDocs == 0) {
|
||||
log.info("Indexing ZERO test docs");
|
||||
return;
|
||||
}
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
client.setDefaultCollection(collectionName);
|
||||
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
|
||||
for (int i=0; i<numDocs; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", i);
|
||||
doc.addField("shard_s", "shard" + (1 + random.nextInt(NUM_SHARDS))); // for implicit router
|
||||
docs.add(doc);
|
||||
}
|
||||
client.add(docs);// batch
|
||||
client.commit();
|
||||
}
|
||||
|
||||
private void testBackupAndRestore(String collectionName) throws Exception {
|
||||
String backupName = "mytestbackup";
|
||||
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
DocCollection backupCollection = client.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
|
||||
Map<String, Integer> origShardToDocCount = getShardToDocCountMap(client, backupCollection);
|
||||
assert origShardToDocCount.isEmpty() == false;
|
||||
|
||||
String location = createTempDir().toFile().getAbsolutePath();
|
||||
|
||||
log.info("Triggering Backup command");
|
||||
|
||||
{
|
||||
CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(collectionName, backupName)
|
||||
.setLocation(location);
|
||||
if (random().nextBoolean()) {
|
||||
assertEquals(0, backup.process(client).getStatus());
|
||||
} else {
|
||||
assertEquals(RequestStatusState.COMPLETED, backup.processAndWait(client, 30));//async
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Triggering Restore command");
|
||||
|
||||
String restoreCollectionName = collectionName + "_restored";
|
||||
boolean sameConfig = random().nextBoolean();
|
||||
|
||||
{
|
||||
CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
|
||||
.setLocation(location);
|
||||
if (origShardToDocCount.size() > cluster.getJettySolrRunners().size()) {
|
||||
// may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
|
||||
restore.setMaxShardsPerNode(origShardToDocCount.size());
|
||||
}
|
||||
Properties props = new Properties();
|
||||
props.setProperty("customKey", "customVal");
|
||||
restore.setProperties(props);
|
||||
if (sameConfig==false) {
|
||||
restore.setConfigName("customConfigName");
|
||||
}
|
||||
if (random().nextBoolean()) {
|
||||
assertEquals(0, restore.process(client).getStatus());
|
||||
} else {
|
||||
assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 30));//async
|
||||
}
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish(
|
||||
restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
|
||||
}
|
||||
|
||||
//Check the number of results are the same
|
||||
DocCollection restoreCollection = client.getZkStateReader().getClusterState().getCollection(restoreCollectionName);
|
||||
assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
|
||||
//Re-index same docs (should be identical docs given same random seed) and test we have the same result. Helps
|
||||
// test we reconstituted the hash ranges / doc router.
|
||||
if (!(restoreCollection.getRouter() instanceof ImplicitDocRouter) && random().nextBoolean()) {
|
||||
indexDocs(restoreCollectionName);
|
||||
assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
|
||||
}
|
||||
|
||||
assertEquals(backupCollection.getReplicationFactor(), restoreCollection.getReplicationFactor());
|
||||
assertEquals(backupCollection.getAutoAddReplicas(), restoreCollection.getAutoAddReplicas());
|
||||
assertEquals(backupCollection.getActiveSlices().iterator().next().getReplicas().size(),
|
||||
restoreCollection.getActiveSlices().iterator().next().getReplicas().size());
|
||||
assertEquals(sameConfig ? "conf1" : "customConfigName",
|
||||
cluster.getSolrClient().getZkStateReader().readConfigName(restoreCollectionName));
|
||||
|
||||
// assert added core properties:
|
||||
// DWS: did via manual inspection.
|
||||
// TODO Find the applicable core.properties on the file system but how?
|
||||
}
|
||||
|
||||
private Map<String, Integer> getShardToDocCountMap(CloudSolrClient client, DocCollection docCollection) throws SolrServerException, IOException {
|
||||
Map<String,Integer> shardToDocCount = new TreeMap<>();
|
||||
for (Slice slice : docCollection.getActiveSlices()) {
|
||||
String shardName = slice.getName();
|
||||
long docsInShard = client.query(docCollection.getName(), new SolrQuery("*:*").setParam(_ROUTE_, shardName))
|
||||
.getResults().getNumFound();
|
||||
shardToDocCount.put(shardName, (int) docsInShard);
|
||||
}
|
||||
return shardToDocCount;
|
||||
}
|
||||
|
||||
}
|
|
@ -111,7 +111,7 @@ public class TestRestoreCore extends SolrJettyTestBase {
|
|||
@Test
|
||||
public void testSimpleRestore() throws Exception {
|
||||
|
||||
int nDocs = TestReplicationHandlerBackup.indexDocs(masterClient);
|
||||
int nDocs = usually() ? TestReplicationHandlerBackup.indexDocs(masterClient) : 0;
|
||||
|
||||
String snapshotName;
|
||||
String location;
|
||||
|
@ -139,11 +139,12 @@ public class TestRestoreCore extends SolrJettyTestBase {
|
|||
|
||||
|
||||
|
||||
int numRestoreTests = TestUtil.nextInt(random(), 1, 5);
|
||||
int numRestoreTests = nDocs > 0 ? TestUtil.nextInt(random(), 1, 5) : 1;
|
||||
|
||||
for (int attempts=0; attempts<numRestoreTests; attempts++) {
|
||||
//Modify existing index before we call restore.
|
||||
|
||||
if (nDocs > 0) {
|
||||
//Delete a few docs
|
||||
int numDeletes = TestUtil.nextInt(random(), 1, nDocs);
|
||||
for(int i=0; i<numDeletes; i++) {
|
||||
|
@ -163,6 +164,7 @@ public class TestRestoreCore extends SolrJettyTestBase {
|
|||
if (usually()) {
|
||||
masterClient.commit();
|
||||
}
|
||||
}
|
||||
|
||||
TestReplicationHandlerBackup.runBackupCommand(masterJetty, ReplicationHandler.CMD_RESTORE, params);
|
||||
|
||||
|
|
|
@ -588,6 +588,140 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
}
|
||||
}
|
||||
|
||||
public static Backup backupCollection(String collection, String backupName) {
|
||||
return new Backup(collection, backupName);
|
||||
}
|
||||
|
||||
// BACKUP request
|
||||
public static class Backup extends AsyncCollectionSpecificAdminRequest {
|
||||
protected final String name;
|
||||
protected String location;
|
||||
|
||||
public Backup(String collection, String name) {
|
||||
super(CollectionAction.BACKUP, collection);
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public Backup setAsyncId(String id) {
|
||||
this.asyncId = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public Backup setCollectionName(String collection) {
|
||||
this.collection = collection;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public Backup setLocation(String location) {
|
||||
this.location = location;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.NAME, name);
|
||||
params.set("location", location); //note: optional
|
||||
return params;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Restore restoreCollection(String collection, String backupName) {
|
||||
return new Restore(collection, backupName);
|
||||
}
|
||||
|
||||
// RESTORE request
|
||||
public static class Restore extends AsyncCollectionSpecificAdminRequest {
|
||||
protected final String backupName;
|
||||
protected String location;
|
||||
|
||||
// in common with collection creation:
|
||||
protected String configName;
|
||||
protected Integer maxShardsPerNode;
|
||||
protected Integer replicationFactor;
|
||||
protected Boolean autoAddReplicas;
|
||||
protected Properties properties;
|
||||
|
||||
public Restore(String collection, String backupName) {
|
||||
super(CollectionAction.RESTORE, collection);
|
||||
this.backupName = backupName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Restore setAsyncId(String id) {
|
||||
this.asyncId = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Restore setCollectionName(String collection) {
|
||||
this.collection = collection;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public Restore setLocation(String location) {
|
||||
this.location = location;
|
||||
return this;
|
||||
}
|
||||
|
||||
// Collection creation params in common:
|
||||
public Restore setConfigName(String config) { this.configName = config; return this; }
|
||||
public String getConfigName() { return configName; }
|
||||
|
||||
public Integer getMaxShardsPerNode() { return maxShardsPerNode; }
|
||||
public Restore setMaxShardsPerNode(int maxShardsPerNode) { this.maxShardsPerNode = maxShardsPerNode; return this; }
|
||||
|
||||
public Integer getReplicationFactor() { return replicationFactor; }
|
||||
public Restore setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
|
||||
|
||||
public Boolean getAutoAddReplicas() { return autoAddReplicas; }
|
||||
public Restore setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
|
||||
|
||||
public Properties getProperties() {
|
||||
return properties;
|
||||
}
|
||||
public Restore setProperties(Properties properties) { this.properties = properties; return this;}
|
||||
|
||||
// TODO support createNodeSet, rule, snitch
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.NAME, backupName);
|
||||
params.set("location", location); //note: optional
|
||||
params.set("collection.configName", configName); //note: optional
|
||||
if (maxShardsPerNode != null) {
|
||||
params.set( "maxShardsPerNode", maxShardsPerNode);
|
||||
}
|
||||
if (replicationFactor != null) {
|
||||
params.set("replicationFactor", replicationFactor);
|
||||
}
|
||||
if (autoAddReplicas != null) {
|
||||
params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
|
||||
}
|
||||
if (properties != null) {
|
||||
addProperties(params, properties);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a SolrRequest to create a new shard in a collection
|
||||
*/
|
||||
|
|
|
@ -316,6 +316,7 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
return new Aliases(aliasMap);
|
||||
}
|
||||
|
||||
// TODO move to static DocCollection.loadFromMap
|
||||
private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
|
||||
Map<String,Object> props;
|
||||
Map<String,Slice> slices;
|
||||
|
@ -323,10 +324,10 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
Map<String,Object> sliceObjs = (Map<String,Object>)objs.get(DocCollection.SHARDS);
|
||||
if (sliceObjs == null) {
|
||||
// legacy format from 4.0... there was no separate "shards" level to contain the collection shards.
|
||||
slices = makeSlices(objs);
|
||||
slices = Slice.loadAllFromMap(objs);
|
||||
props = Collections.emptyMap();
|
||||
} else {
|
||||
slices = makeSlices(sliceObjs);
|
||||
slices = Slice.loadAllFromMap(sliceObjs);
|
||||
props = new HashMap<>(objs);
|
||||
objs.remove(DocCollection.SHARDS);
|
||||
}
|
||||
|
@ -346,21 +347,6 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
return new DocCollection(name, slices, props, router, version, znode);
|
||||
}
|
||||
|
||||
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
|
||||
if (genericSlices == null) return Collections.emptyMap();
|
||||
Map<String,Slice> result = new LinkedHashMap<>(genericSlices.size());
|
||||
for (Map.Entry<String,Object> entry : genericSlices.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
Object val = entry.getValue();
|
||||
if (val instanceof Slice) {
|
||||
result.put(name, (Slice)val);
|
||||
} else if (val instanceof Map) {
|
||||
result.put(name, new Slice(name, null, (Map<String,Object>)val));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(JSONWriter jsonWriter) {
|
||||
LinkedHashMap<String , DocCollection> map = new LinkedHashMap<>();
|
||||
|
|
|
@ -16,28 +16,45 @@
|
|||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import org.noggit.JSONUtil;
|
||||
import org.noggit.JSONWriter;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import org.noggit.JSONUtil;
|
||||
import org.noggit.JSONWriter;
|
||||
|
||||
/**
|
||||
* A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
|
||||
*/
|
||||
public class Slice extends ZkNodeProps {
|
||||
|
||||
/** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
|
||||
public static Map<String,Slice> loadAllFromMap(Map<String, Object> genericSlices) {
|
||||
if (genericSlices == null) return Collections.emptyMap();
|
||||
Map<String,Slice> result = new LinkedHashMap<>(genericSlices.size());
|
||||
for (Map.Entry<String,Object> entry : genericSlices.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
Object val = entry.getValue();
|
||||
if (val instanceof Slice) {
|
||||
result.put(name, (Slice)val);
|
||||
} else if (val instanceof Map) {
|
||||
result.put(name, new Slice(name, null, (Map<String,Object>)val));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** The slice's state. */
|
||||
public enum State {
|
||||
|
||||
/** The default state of a slice. */
|
||||
/** The normal/default state of a shard. */
|
||||
ACTIVE,
|
||||
|
||||
/**
|
||||
* A slice is put in that state after it has been successfully split. See
|
||||
* A shard is put in that state after it has been successfully split. See
|
||||
* <a href="https://cwiki.apache.org/confluence/display/solr/Collections+API#CollectionsAPI-api3">
|
||||
* the reference guide</a> for more details.
|
||||
*/
|
||||
|
@ -45,7 +62,8 @@ public class Slice extends ZkNodeProps {
|
|||
|
||||
/**
|
||||
* When a shard is split, the new sub-shards are put in that state while the
|
||||
* split operation is in progress. A shard in that state still receives
|
||||
* split operation is in progress. It's also used when the shard is undergoing data restoration.
|
||||
* A shard in this state still receives
|
||||
* update requests from the parent shard leader, however does not participate
|
||||
* in distributed search.
|
||||
*/
|
||||
|
|
|
@ -98,6 +98,8 @@ public class ZkStateReader implements Closeable {
|
|||
|
||||
public static final String URL_SCHEME = "urlScheme";
|
||||
|
||||
public static final String BACKUP_LOCATION = "location";
|
||||
|
||||
/** A view of the current state of all collections; combines all the different state sources into a single view. */
|
||||
protected volatile ClusterState clusterState;
|
||||
|
||||
|
@ -134,7 +136,8 @@ public class ZkStateReader implements Closeable {
|
|||
public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
|
||||
LEGACY_CLOUD,
|
||||
URL_SCHEME,
|
||||
AUTO_ADD_REPLICAS)));
|
||||
AUTO_ADD_REPLICAS,
|
||||
BACKUP_LOCATION)));
|
||||
|
||||
/**
|
||||
* Returns config set name for collection.
|
||||
|
|
|
@ -53,7 +53,9 @@ public interface CollectionParams
|
|||
BALANCESHARDUNIQUE(true),
|
||||
REBALANCELEADERS(true),
|
||||
MODIFYCOLLECTION(true),
|
||||
MIGRATESTATEFORMAT(true);
|
||||
MIGRATESTATEFORMAT(true),
|
||||
BACKUP(true),
|
||||
RESTORE(true);
|
||||
|
||||
public final boolean isWrite;
|
||||
|
||||
|
|
|
@ -128,7 +128,10 @@ public abstract class CoreAdminParams
|
|||
REJOINLEADERELECTION,
|
||||
//internal API used by force shard leader election
|
||||
FORCEPREPAREFORLEADERSHIP,
|
||||
INVOKE;
|
||||
INVOKE,
|
||||
//Internal APIs to backup and restore a core
|
||||
BACKUPCORE,
|
||||
RESTORECORE;
|
||||
|
||||
public final boolean isRead;
|
||||
|
||||
|
|
|
@ -16,12 +16,13 @@
|
|||
*/
|
||||
package org.apache.solr.common.util;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
import java.io.IOException;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
||||
|
@ -147,7 +148,7 @@ public class StrUtils {
|
|||
* Creates a backslash escaped string, joining all the items.
|
||||
* @see #escapeTextWithSeparator
|
||||
*/
|
||||
public static String join(List<?> items, char separator) {
|
||||
public static String join(Collection<?> items, char separator) {
|
||||
StringBuilder sb = new StringBuilder(items.size() << 3);
|
||||
boolean first=true;
|
||||
for (Object o : items) {
|
||||
|
|
|
@ -149,7 +149,12 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
|||
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
|
||||
assertNotNull("Could not find collection:" + collection, slices);
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,Replica> shards = entry.getValue().getReplicasMap();
|
||||
Slice slice = entry.getValue();
|
||||
if (slice.getState() == Slice.State.CONSTRUCTION) { // similar to replica recovering; pretend its the same thing
|
||||
if (verbose) System.out.println("Found a slice in construction state; will wait.");
|
||||
sawLiveRecovering = true;
|
||||
}
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
|
||||
if (verbose) System.out.println("replica:" + shard.getValue().getName() + " rstate:"
|
||||
+ shard.getValue().getStr(ZkStateReader.STATE_PROP)
|
||||
|
@ -234,7 +239,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
|||
fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
|
||||
}
|
||||
|
||||
protected void assertAllActive(String collection,ZkStateReader zkStateReader)
|
||||
protected static void assertAllActive(String collection, ZkStateReader zkStateReader)
|
||||
throws KeeperException, InterruptedException {
|
||||
|
||||
zkStateReader.forceUpdateCollection(collection);
|
||||
|
@ -244,12 +249,15 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
|||
throw new IllegalArgumentException("Cannot find collection:" + collection);
|
||||
}
|
||||
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
|
||||
Map<String,Replica> shards = entry.getValue().getReplicasMap();
|
||||
Slice slice = entry.getValue();
|
||||
if (slice.getState() != Slice.State.ACTIVE) {
|
||||
fail("Not all shards are ACTIVE - found a shard " + slice.getName() + " that is: " + slice.getState());
|
||||
}
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
|
||||
|
||||
final Replica.State state = shard.getValue().getState();
|
||||
if (state != Replica.State.ACTIVE) {
|
||||
fail("Not all shards are ACTIVE - found a shard that is: " + state.toString());
|
||||
Replica replica = shard.getValue();
|
||||
if (replica.getState() != Replica.State.ACTIVE) {
|
||||
fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + replica.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
|
||||
}
|
||||
|
||||
protected void waitForCollection(ZkStateReader reader, String collection, int slices) throws Exception {
|
||||
protected static void waitForCollection(ZkStateReader reader, String collection, int slices) throws Exception {
|
||||
// wait until shards have started registering...
|
||||
int cnt = 30;
|
||||
while (!reader.getClusterState().hasCollection(collection)) {
|
||||
|
|
Loading…
Reference in New Issue