SOLR-3488: Added a Collection management API for SolrCloud.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1356313 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-07-02 15:57:12 +00:00
parent 6cffa393bd
commit 3bbbd0bea3
23 changed files with 961 additions and 173 deletions

View File

@ -31,6 +31,9 @@ New Features
* SOLR-1856: In Solr Cell, literals should override Tika-parsed values.
Patch adds a param "literalsOverride" which defaults to true, but can be set
to "false" to let Tika-parsed values be appended to literal values (Chris Harris, janhoy)
* SOLR-3488: Added a Collection management API for SolrCloud.
(Tommaso Teofili, Sami Siren, yonik, Mark Miller)
Bug Fixes

View File

@ -25,7 +25,7 @@ public class DistributedClusteringComponentTest extends
@Override
public String getSolrHome() {
return "clustering/solr";
return getFile("clustering/solr/collection1").getParent();
}
@Override

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -265,6 +265,45 @@ public class DistributedQueue {
}
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty.
*
* @return data at the first element of the queue, or null.
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
if (!block) {
return peek();
}
TreeMap<Long,String> orderedChildren;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
continue;
}
if (orderedChildren.size() == 0) {
childWatcher.await();
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, false, null);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
}
}
}
/**
* Attempts to remove the head of the queue and return it. Returns null if the
* queue is empty.

View File

@ -13,6 +13,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@ -96,6 +97,8 @@ class ShardLeaderElectionContextBase extends ElectionContext {
CreateMode.EPHEMERAL, true);
}
// TODO: above we make it looks like leaderProps could be true, but here
// you would get an NPE if it was.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
"leader", ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
@ -245,10 +248,14 @@ final class OverseerElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
private final ZkStateReader stateReader;
private ShardHandler shardHandler;
private String adminPath;
public OverseerElectionContext(final String zkNodeName, ZkStateReader stateReader) {
public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
this.stateReader = stateReader;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.zkClient = stateReader.getZkClient();
}
@ -271,7 +278,7 @@ final class OverseerElectionContext extends ElectionContext {
CreateMode.EPHEMERAL, true);
}
new Overseer(stateReader, id);
new Overseer(shardHandler, adminPath, stateReader, id);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -48,6 +49,7 @@ public class Overseer {
private static class CloudStateUpdater implements Runnable {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
private final SolrZkClient zkClient;
private final String myId;
@ -159,7 +161,7 @@ public class Overseer {
final ZkNodeProps message, final String operation) {
if ("state".equals(operation)) {
cloudState = updateState(cloudState, message);
} else if ("deletecore".equals(operation)) {
} else if (DELETECORE.equals(operation)) {
cloudState = removeCore(cloudState, message);
} else if (ZkStateReader.LEADER_PROP.equals(operation)) {
StringBuilder sb = new StringBuilder();
@ -360,13 +362,36 @@ public class Overseer {
LinkedHashMap<String, ZkNodeProps> newShards = new LinkedHashMap<String, ZkNodeProps>();
newShards.putAll(slice.getShards());
newShards.remove(coreNodeName);
Slice newSlice = new Slice(slice.getName(), newShards);
newSlices.put(slice.getName(), newSlice);
} else {
newSlices.put(slice.getName(), slice);
}
}
newStates.put(collectionName, newSlices);
int cnt = 0;
for (Slice slice : newSlices.values()) {
cnt+=slice.getShards().size();
}
// TODO: if no nodes are left after this unload
// remove from zk - do we have a race where Overseer
// see's registered nodes and publishes though?
if (cnt > 0) {
newStates.put(collectionName, newSlices);
} else {
// TODO: it might be better logically to have this in ZkController
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
// ZkController out of the Overseer.
try {
zkClient.clean("/collections/" + collectionName);
} catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collectionName, e);
Thread.currentThread().interrupt();
} catch (KeeperException e) {
SolrException.log(log, "Problem cleaning up collection in zk:" + collectionName, e);
}
}
} else {
newStates.put(collectionName, cloudState.getSlices(collectionName));
}
@ -374,9 +399,10 @@ public class Overseer {
CloudState newState = new CloudState(cloudState.getLiveNodes(), newStates);
return newState;
}
}
public Overseer(final ZkStateReader reader, final String id) {
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
@ -384,6 +410,11 @@ public class Overseer {
Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
updaterThread.setDaemon(true);
updaterThread.start();
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath));
ccThread.setDaemon(true);
ccThread.start();
}
/**
@ -400,6 +431,12 @@ public class Overseer {
return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue-work", null);
}
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/collection-queue-work", null);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);

View File

@ -0,0 +1,282 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OverseerCollectionProcessor implements Runnable {
public static final String DELETECOLLECTION = "deletecollection";
public static final String CREATECOLLECTION = "createcollection";
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
private static Logger log = LoggerFactory
.getLogger(OverseerCollectionProcessor.class);
private DistributedQueue workQueue;
private String myId;
private ShardHandler shardHandler;
private String adminPath;
private ZkStateReader zkStateReader;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this.zkStateReader = zkStateReader;
this.myId = myId;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
workQueue = Overseer.getCollectionQueue(zkStateReader.getZkClient());
}
@Override
public void run() {
log.info("Process current queue of collection creations");
while (amILeader()) {
try {
byte[] head = workQueue.peek(true);
//if (head != null) { // should not happen since we block above
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.get(QUEUE_OPERATION);
boolean success = processMessage(message, operation);
if (!success) {
// TODO: what to do on failure / partial failure
// if we fail, do we clean up then ?
SolrException.log(log, "Collection creation of " + message.get("name") + " failed");
}
//}
workQueue.remove();
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
|| e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("Overseer cannot talk to ZK");
return;
}
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
"/overseer_elect/leader", null, null, true));
if (myId.equals(props.get("id"))) {
return true;
}
} catch (KeeperException e) {
log.warn("", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return false;
}
private boolean processMessage(ZkNodeProps message, String operation) {
if (CREATECOLLECTION.equals(operation)) {
return createCollection(zkStateReader.getCloudState(), message);
} else if (DELETECOLLECTION.equals(operation)) {
return deleteCollection(zkStateReader.getCloudState(), message);
}
// unknown command, toss it from our queue
return true;
}
private boolean deleteCollection(CloudState cloudState, ZkNodeProps message) {
String name = message.get("name");
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
Map<String,Slice> slices = cloudState.getCollectionStates().get(name);
if (slices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + name);
}
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
Map<String,ZkNodeProps> shards = slice.getShards();
Set<Map.Entry<String,ZkNodeProps>> shardEntries = shards.entrySet();
for (Map.Entry<String,ZkNodeProps> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (cloudState.liveNodesContain(node.get(ZkStateReader.NODE_NAME_PROP))) {
params.set(CoreAdminParams.CORE, name);
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
String replica = node.get(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
// yes, they must use same admin handler path everywhere...
params.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
}
}
int failed = 0;
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
Throwable e = srsp.getException();
if (e != null) {
// should we retry?
// TODO: we should return errors to the client
// TODO: what if one fails and others succeed?
failed++;
log.error("Error talking to shard: " + srsp.getShard(), e);
}
}
} while (srsp != null);
// if all calls succeeded, return true
if (failed > 0) {
return false;
}
return true;
}
// TODO: bad name conflict with another method
private boolean createCollection(CloudState cloudState, ZkNodeProps message) {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
String numReplicasString = message.get("numReplicas");
int numReplicas;
try {
numReplicas = numReplicasString == null ? 0 : Integer.parseInt(numReplicasString);
} catch (Exception ex) {
SolrException.log(log, "Could not parse numReplicas", ex);
return false;
}
String numShardsString = message.get("numShards");
int numShards;
try {
numShards = numShardsString == null ? 0 : Integer.parseInt(numShardsString);
} catch (Exception ex) {
SolrException.log(log, "Could not parse numShards", ex);
return false;
}
String name = message.get("name");
String configName = message.get("collection.configName");
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
// TODO: add smarter options that look at the current number of cores per node?
// for now we just go random
Set<String> nodes = cloudState.getLiveNodes();
List<String> nodeList = new ArrayList<String>(nodes.size());
nodeList.addAll(nodes);
Collections.shuffle(nodeList);
int numNodes = numShards * (numReplicas + 1);
List<String> createOnNodes = nodeList.subList(0, Math.min(nodeList.size() -1, numNodes - 1));
for (String replica : createOnNodes) {
// TODO: this does not work if original url had _ in it
replica = replica.replaceAll("_", "/");
params.set(CoreAdminParams.NAME, name);
params.set("collection.configName", configName);
params.set("numShards", numShards);
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
int failed = 0;
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
Throwable e = srsp.getException();
if (e != null) {
// should we retry?
// TODO: we should return errors to the client
// TODO: what if one fails and others succeed?
failed++;
log.error("Error talking to shard: " + srsp.getShard(), e);
}
}
} while (srsp != null);
// if all calls succeeded, return true
if (failed > 0) {
return false;
}
return true;
}
}

View File

@ -51,6 +51,8 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.DOMUtil;
import org.apache.zookeeper.CreateMode;
@ -83,7 +85,9 @@ public final class ZkController {
private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerStatusQueue;
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
// package private for tests
@ -113,7 +117,8 @@ public final class ZkController {
private LeaderElector overseerElector;
// this can be null in which case recovery will be inactive
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
private CoreContainer cc;
/**
@ -176,7 +181,7 @@ public final class ZkController {
* @throws TimeoutException
* @throws IOException
*/
public ZkController(CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
this.cc = cc;
@ -203,8 +208,19 @@ public final class ZkController {
// seems we dont need to do this again...
//Overseer.createClientNodes(zkClient, getNodeName());
ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
ShardHandler shardHandler;
String adminPath;
if (cc == null) {
shardHandler = new HttpShardHandlerFactory().getShardHandler();
adminPath = "/admin/cores";
} else {
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
}
ElectionContext context = new OverseerElectionContext(
shardHandler, adminPath,
getNodeName(), zkStateReader);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@ -254,7 +270,8 @@ public final class ZkController {
});
this.overseerStatusQueue = Overseer.getInQueue(zkClient);
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor();
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@ -377,8 +394,19 @@ public final class ZkController {
createEphemeralLiveNode();
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
ShardHandler shardHandler;
String adminPath;
if (cc == null) {
shardHandler = new HttpShardHandlerFactory().getShardHandler();
adminPath = "/admin/cores";
} else {
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
}
overseerElector = new LeaderElector(zkClient);
ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
ElectionContext context = new OverseerElectionContext(shardHandler,
adminPath, getNodeName(), zkStateReader);
overseerElector.setup(context);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@ -742,7 +770,7 @@ public final class ZkController {
.getCollectionName(), ZkStateReader.STATE_PROP, state,
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
: null);
overseerStatusQueue.offer(ZkStateReader.toJSON(m));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
@ -771,7 +799,7 @@ public final class ZkController {
"deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, cloudDesc.getCollectionName());
overseerStatusQueue.offer(ZkStateReader.toJSON(m));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
final String zkNodeName = getNodeName() + "_" + coreName;
ElectionContext context = electionContexts.remove(zkNodeName);
@ -779,6 +807,14 @@ public final class ZkController {
context.cancelElection();
}
}
public void createCollection(String collection) throws KeeperException,
InterruptedException {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
"createcollection", ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, collection);
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
/**
* @param dir
@ -1093,4 +1129,12 @@ public final class ZkController {
}
}
public DistributedQueue getOverseerJobQueue() {
return overseerJobQueue;
}
public DistributedQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
}

View File

@ -62,6 +62,7 @@ import org.apache.solr.util.FileUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandlerFactory;
@ -103,6 +104,7 @@ public class CoreContainer
protected String hostContext;
protected String host;
protected CoreAdminHandler coreAdminHandler = null;
protected CollectionsHandler collectionsHandler = null;
protected File configFile = null;
protected String libDir = null;
protected ClassLoader libLoader = null;
@ -453,6 +455,8 @@ public class CoreContainer
}
}
collectionsHandler = new CollectionsHandler(this);
try {
containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
} catch (Throwable e) {
@ -1023,6 +1027,10 @@ public class CoreContainer
return coreAdminHandler;
}
public CollectionsHandler getCollectionsHandler() {
return collectionsHandler;
}
/**
* the default core name, or null if there is no default core name
*/

View File

@ -0,0 +1,163 @@
package org.apache.solr.handler.admin;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CollectionsHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
protected final CoreContainer coreContainer;
public CollectionsHandler() {
super();
// Unlike most request handlers, CoreContainer initialization
// should happen in the constructor...
this.coreContainer = null;
}
/**
* Overloaded ctor to inject CoreContainer into the handler.
*
* @param coreContainer Core Container of the solr webapp installed.
*/
public CollectionsHandler(final CoreContainer coreContainer) {
this.coreContainer = coreContainer;
}
@Override
final public void init(NamedList args) {
}
/**
* The instance of CoreContainer this handler handles. This should be the CoreContainer instance that created this
* handler.
*
* @return a CoreContainer instance
*/
public CoreContainer getCoreContainer() {
return this.coreContainer;
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
// Make sure the cores is enabled
CoreContainer cores = getCoreContainer();
if (cores == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Core container instance missing");
}
// Pick the action
SolrParams params = req.getParams();
CollectionAction action = null;
String a = params.get(CoreAdminParams.ACTION);
if (a != null) {
action = CollectionAction.get(a);
}
if (action != null) {
switch (action) {
case CREATE: {
this.handleCreateAction(req, rsp);
break;
}
case DELETE: {
this.handleDeleteAction(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
}
}
}
rsp.setHttpCaching(false);
}
private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
String name = req.getParams().required().get("name");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
// TODO: what if you want to block until the collection is available?
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
}
// very simple currently, you can pass a template collection, and the new collection is created on
// every node the template collection is on
// there is a lot more to add - you should also be able to create with an explicit server list
// we might also want to think about error handling (add the request to a zk queue and involve overseer?)
// as well as specific replicas= options
private void handleCreateAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws InterruptedException, KeeperException {
Integer numReplicas = req.getParams().getInt("numReplicas", 0);
String name = req.getParams().required().get("name");
String configName = req.getParams().get("collection.configName");
String numShards = req.getParams().get("numShards");
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION, "numReplicas", numReplicas.toString(), "name", name,
"collection.configName", configName, "numShards", numShards);
// TODO: what if you want to block until the collection is available?
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
}
public static ModifiableSolrParams params(String... params) {
ModifiableSolrParams msp = new ModifiableSolrParams();
for (int i=0; i<params.length; i+=2) {
msp.add(params[i], params[i+1]);
}
return msp;
}
//////////////////////// SolrInfoMBeans methods //////////////////////
@Override
public String getDescription() {
return "Manage SolrCloud Collections";
}
@Override
public String getSource() {
return "$URL: https://svn.apache.org/repos/asf/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandler.java $";
}
}

View File

@ -21,17 +21,13 @@ import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
@ -53,8 +49,6 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -188,11 +182,6 @@ public class CoreAdminHandler extends RequestHandlerBase {
break;
}
case DISTRIBURL: {
this.handleDistribUrlAction(req, rsp);
break;
}
default: {
doPersist = this.handleCustomAction(req, rsp);
break;
@ -461,58 +450,102 @@ public class CoreAdminHandler extends RequestHandlerBase {
* @return true if a modification has resulted that requires persistance
* of the CoreContainer configuration.
*/
protected boolean handleUnloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws SolrException {
protected boolean handleUnloadAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws SolrException {
SolrParams params = req.getParams();
String cname = params.get(CoreAdminParams.CORE);
SolrCore core = coreContainer.remove(cname);
if(core == null){
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"No such core exists '" + cname + "'");
} else {
if (coreContainer.getZkController() != null) {
log.info("Unregistering core " + cname + " from cloudstate.");
try {
coreContainer.getZkController().unregister(cname, core.getCoreDescriptor().getCloudDescriptor());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not unregister core " + cname + " from cloudstate: "
+ e.getMessage(), e);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not unregister core " + cname + " from cloudstate: "
+ e.getMessage(), e);
}
}
}
if (params.getBool(CoreAdminParams.DELETE_INDEX, false)) {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {}
@Override
public void postClose(SolrCore core) {
File dataDir = new File(core.getIndexDir());
File[] files = dataDir.listFiles();
if (files != null) {
for (File file : files) {
if (!file.delete()) {
log.error(file.getAbsolutePath()
+ " could not be deleted on core unload");
}
}
if (!dataDir.delete()) log.error(dataDir.getAbsolutePath()
+ " could not be deleted on core unload");
} else {
log.error(dataDir.getAbsolutePath()
+ " could not be deleted on core unload");
try {
if (core == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"No such core exists '" + cname + "'");
} else {
if (coreContainer.getZkController() != null) {
log.info("Unregistering core " + core.getName() + " from cloudstate.");
try {
coreContainer.getZkController().unregister(cname,
core.getCoreDescriptor().getCloudDescriptor());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not unregister core " + cname + " from cloudstate: "
+ e.getMessage(), e);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not unregister core " + cname + " from cloudstate: "
+ e.getMessage(), e);
}
}
});
}
if (params.getBool(CoreAdminParams.DELETE_INDEX, false)) {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {}
@Override
public void postClose(SolrCore core) {
File dataDir = new File(core.getIndexDir());
File[] files = dataDir.listFiles();
if (files != null) {
for (File file : files) {
if (!file.delete()) {
log.error(file.getAbsolutePath()
+ " could not be deleted on core unload");
}
}
if (!dataDir.delete()) log.error(dataDir.getAbsolutePath()
+ " could not be deleted on core unload");
} else {
log.error(dataDir.getAbsolutePath()
+ " could not be deleted on core unload");
}
}
});
}
if (params.getBool(CoreAdminParams.DELETE_DATA_DIR, false)) {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {}
@Override
public void postClose(SolrCore core) {
File dataDir = new File(core.getDataDir());
try {
FileUtils.deleteDirectory(dataDir);
} catch (IOException e) {
SolrException.log(log, "Failed to delete data dir for core:"
+ core.getName() + " dir:" + dataDir.getAbsolutePath());
}
}
});
}
if (params.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, false)) {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {}
@Override
public void postClose(SolrCore core) {
CoreDescriptor cd = core.getCoreDescriptor();
if (cd != null) {
File instanceDir = new File(cd.getInstanceDir());
try {
FileUtils.deleteDirectory(instanceDir);
} catch (IOException e) {
SolrException.log(log, "Failed to delete instance dir for core:"
+ core.getName() + " dir:" + instanceDir.getAbsolutePath());
}
}
}
});
}
} finally {
if (core != null) core.close();
}
core.close();
return coreContainer.isPersistent();
}
/**
@ -743,64 +776,6 @@ public class CoreAdminHandler extends RequestHandlerBase {
// }
}
protected void handleDistribUrlAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
// TODO: finish this and tests
SolrParams params = req.getParams();
final ModifiableSolrParams newParams = new ModifiableSolrParams(params);
newParams.remove("action");
SolrParams required = params.required();
final String subAction = required.get("subAction");
String collection = required.get("collection");
newParams.set(CoreAdminParams.ACTION, subAction);
SolrCore core = req.getCore();
ZkController zkController = core.getCoreDescriptor().getCoreContainer()
.getZkController();
CloudState cloudState = zkController.getCloudState();
Map<String,Slice> slices = cloudState.getCollectionStates().get(collection);
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
Map<String,ZkNodeProps> shards = slice.getShards();
Set<Map.Entry<String,ZkNodeProps>> shardEntries = shards.entrySet();
for (Map.Entry<String,ZkNodeProps> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (cloudState.liveNodesContain(node.get(ZkStateReader.NODE_NAME_PROP))) {
newParams.set(CoreAdminParams.CORE, node.get(ZkStateReader.CORE_NAME_PROP));
String replica = node.get(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
newParams.set("qt", "/admin/cores");
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://"))
replica = replica.substring(7);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.params = newParams;
shardHandler.submit(sreq, replica, sreq.params);
}
}
}
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
Throwable e = srsp.getException();
if (e != null) {
log.error("Error talking to shard: " + srsp.getShard(), e);
}
}
} while(srsp != null);
}
protected NamedList<Object> getCoreStatus(CoreContainer cores, String cname) throws IOException {
NamedList<Object> info = new SimpleOrderedMap<Object>();

View File

@ -17,23 +17,18 @@
package org.apache.solr.servlet;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Writer;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.OutputStreamWriter;
import java.io.ByteArrayInputStream;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.WeakHashMap;
import org.apache.solr.handler.ContentStreamHandlerBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.InputSource;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -44,23 +39,33 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.util.FastWriter;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.core.*;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.request.*;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.Config;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ContentStreamHandlerBase;
import org.apache.solr.request.ServletSolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.BinaryQueryResponseWriter;
import org.apache.solr.response.QueryResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.servlet.cache.HttpCacheHeaderUtil;
import org.apache.solr.servlet.cache.Method;
import org.apache.solr.util.FastWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.InputSource;
/**
* This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
@ -175,6 +180,13 @@ public class SolrDispatchFilter implements Filter
handleAdminRequest(req, response, handler, solrReq);
return;
}
// Check for the core admin collections url
if( path.equals( "/admin/collections" ) ) {
handler = cores.getCollectionsHandler();
solrReq = adminRequestParser.parse(null,path, req);
handleAdminRequest(req, response, handler, solrReq);
return;
}
else {
//otherwise, we should find a core from the path
idx = path.indexOf( "/", 1 );

View File

@ -19,7 +19,9 @@ package org.apache.solr.cloud;
import java.io.File;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.common.cloud.CloudState;
@ -37,6 +39,7 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
protected static final String DEFAULT_COLLECTION = "collection1";
private static final boolean DEBUG = false;
protected ZkTestServer zkServer;
private AtomicInteger homeCount = new AtomicInteger();
@Before
@Override
@ -64,15 +67,22 @@ public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearc
@Override
protected void createServers(int numShards) throws Exception {
// give everyone there own solrhome
File controlHome = new File(new File(getSolrHome()).getParentFile(), "control" + homeCount.incrementAndGet());
FileUtils.copyDirectory(new File(getSolrHome()), controlHome);
System.setProperty("collection", "control_collection");
controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard");
controlJetty = createJetty(controlHome, null, "control_shard");
System.clearProperty("collection");
controlClient = createNewSolrServer(controlJetty.getLocalPort());
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= numShards; i++) {
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, "shard" + (i + 2));
// give everyone there own solrhome
File jettyHome = new File(new File(getSolrHome()).getParentFile(), "jetty" + homeCount.incrementAndGet());
FileUtils.copyDirectory(new File(getSolrHome()), jettyHome);
JettySolrRunner j = createJetty(jettyHome, null, "shard" + (i + 2));
jettys.add(j);
clients.add(createNewSolrServer(j.getLocalPort()));
sb.append("localhost:").append(j.getLocalPort()).append(context);

View File

@ -147,11 +147,7 @@ public abstract class AbstractZkTestCase extends SolrTestCaseJ4 {
static void tryCleanPath(String zkHost, String path) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, TIMEOUT);
if (zkClient.exists(path, true)) {
List<String> children = zkClient.getChildren(path, null, true);
for (String string : children) {
tryCleanPath(zkHost, path+"/"+string);
}
zkClient.delete(path, -1, true);
zkClient.clean(path);
}
zkClient.close();
}

View File

@ -23,8 +23,10 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@ -37,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
@ -47,11 +50,14 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -283,7 +289,8 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
// on shards with matches.
// query("q","matchesnothing","fl","*,score", "debugQuery", "true");
// would be better if these where all separate tests - but much, much
// slower
doOptimisticLockingAndUpdating();
testMultipleCollections();
testANewCollectionInOneInstance();
@ -293,6 +300,8 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
testCollectionsAPI();
// Thread.sleep(10000000000L);
if (DEBUG) {
@ -300,6 +309,136 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
}
}
private void testCollectionsAPI() throws Exception {
// TODO: fragile - because we dont pass collection.confName, it will only
// find a default if a conf set with a name matching the collection name is found, or
// if there is only one conf set. That and the fact that other tests run first in this
// env make this pretty fragile
// create 2 new collections rapid fire
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
params.set("numReplicas", 1);
params.set("numShards", 3);
String collectionName = "awholynewcollection";
params.set("name", collectionName);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
clients.get(0).request(request);
String collectionName2 = "awholynewcollection2";
params.set("name", collectionName2);
params.set("numShards", 2);
clients.get(1).request(request);
checkForCollection(collectionName, 3);
checkForCollection(collectionName2, 2);
// lets try and use the solrj client to index and retrieve a couple documents
SolrInputDocument doc = getDoc(id, 6, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall");
int which = (doc.getField(id).toString().hashCode() & 0x7fffffff)
% clients.size();
SolrServer client = clients.get(which);
client.add(doc);
doc = getDoc(id, 7, i1, 123, tlong, 123, t1,
"humpty dumpy had a great fall");
which = (doc.getField(id).toString().hashCode() & 0x7fffffff)
% clients.size();
client = clients.get(which);
client.add(doc);
doc = getDoc(id, 8, i1, 876, tlong, 876, t1,
"all the kings horses and all the kings men");
which = (doc.getField(id).toString().hashCode() & 0x7fffffff)
% clients.size();
client = clients.get(which);
client.add(doc);
commit();
// remove a collection
params = new ModifiableSolrParams();
params.set("action", CollectionAction.DELETE.toString());
params.set("name", collectionName2);
request = new QueryRequest(params);
request.setPath("/admin/collections");
clients.get(0).request(request);
// ensure its out of the state
checkForMissingCollection(collectionName2);
printLayout();
}
private void checkForCollection(String collectionName, int expectedSlices)
throws Exception {
// check for an expectedSlices new collection - we poll the state
long timeoutAt = System.currentTimeMillis() + 30000;
boolean found = false;
while (System.currentTimeMillis() < timeoutAt) {
solrj.getZkStateReader().updateCloudState(true);
CloudState cloudState = solrj.getZkStateReader().getCloudState();
Map<String,Map<String,Slice>> collections = cloudState
.getCollectionStates();
if (collections.containsKey(collectionName)) {
Map<String,Slice> slices = collections.get(collectionName);
// did we find expectedSlices slices/shards?
if (slices.size() == expectedSlices) {
found = true;
// also make sure each are active
Iterator<Entry<String,Slice>> it = slices.entrySet().iterator();
while (it.hasNext()) {
Entry<String,Slice> sliceEntry = it.next();
Map<String,ZkNodeProps> sliceShards = sliceEntry.getValue()
.getShards();
Iterator<Entry<String,ZkNodeProps>> shardIt = sliceShards
.entrySet().iterator();
while (shardIt.hasNext()) {
Entry<String,ZkNodeProps> shardEntry = shardIt.next();
if (!shardEntry.getValue().get(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.ACTIVE)) {
found = false;
break;
}
}
}
if (found) break;
}
}
Thread.sleep(50);
}
if (!found) {
fail("Could not find new " + expectedSlices + " slice collection");
}
}
private void checkForMissingCollection(String collectionName)
throws Exception {
// check for a collection - we poll the state
long timeoutAt = System.currentTimeMillis() + 15000;
boolean found = true;
while (System.currentTimeMillis() < timeoutAt) {
solrj.getZkStateReader().updateCloudState(true);
CloudState cloudState = solrj.getZkStateReader().getCloudState();
Map<String,Map<String,Slice>> collections = cloudState
.getCollectionStates();
if (!collections.containsKey(collectionName)) {
found = false;
break;
}
Thread.sleep(50);
}
if (found) {
fail("Found collection that should be gone " + collectionName);
}
}
/**
* Expects a RegexReplaceProcessorFactories in the chain which will
* "double up" the values in two (stored) string fields.
@ -445,7 +584,7 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
assertAllActive(oneInstanceCollection2, solrj.getZkStateReader());
// TODO: enable when we don't falsly get slice1...
// TODO: enable when we don't falsely get slice1...
// solrj.getZkStateReader().getLeaderUrl(oneInstanceCollection2, "slice1", 30000);
// solrj.getZkStateReader().getLeaderUrl(oneInstanceCollection2, "slice2", 30000);
client2.add(getDoc(id, "1"));

View File

@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
@ -226,7 +227,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
System.setProperty("collection", "control_collection");
String numShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
controlJetty = createJetty(testDir, testDir + "/control/data",
controlJetty = createJetty(new File(getSolrHome()), testDir + "/control/data",
"control_shard");
System.clearProperty("collection");
if(numShards != null) {
@ -258,7 +259,7 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir, testDir + "/jetty"
JettySolrRunner j = createJetty(new File(getSolrHome()), testDir + "/jetty"
+ this.jettyIntCntr.incrementAndGet(), null, "solrconfig.xml", null);
jettys.add(j);
SolrServer client = createNewSolrServer(j.getLocalPort());

View File

@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.CloudState;
@ -40,6 +42,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@ -47,6 +50,7 @@ import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.xml.sax.SAXException;
@Slow
public class OverseerTest extends SolrTestCaseJ4 {
@ -586,8 +590,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
mockController.publishState("core1", null,1);
while(version == getCloudStateVersion(controllerClient));
Thread.sleep(500);
assertEquals("Shard count does not match", 0, reader.getCloudState()
.getSlice("collection1", "shard1").getShards().size());
assertFalse("collection1 should be gone after publishing the null state", reader.getCloudState().getCollections().contains("collection1"));
} finally {
close(mockController);
@ -902,11 +905,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
private SolrZkClient electNewOverseer(String address) throws InterruptedException,
TimeoutException, IOException, KeeperException {
TimeoutException, IOException, KeeperException, ParserConfigurationException, SAXException {
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
ZkStateReader reader = new ZkStateReader(zkClient);
LeaderElector overseerElector = new LeaderElector(zkClient);
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), reader);
ElectionContext ec = new OverseerElectionContext(new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", address.replaceAll("/", "_"), reader);
overseerElector.setup(ec);
overseerElector.joinElection(ec);
return zkClient;

View File

@ -17,6 +17,7 @@ package org.apache.solr.update;
* limitations under the License.
*/
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -56,7 +57,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// TODO: for now we redefine this method so that it pulls from the above
// we don't get helpful override behavior due to the method being static
protected void createServers(int numShards) throws Exception {
controlJetty = createJetty(testDir, testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlJetty = createJetty(new File(getSolrHome()), testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlClient = createNewSolrServer(controlJetty.getLocalPort());
@ -64,7 +65,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numShards; i++) {
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir,
JettySolrRunner j = createJetty(new File(getSolrHome()),
testDir + "/shard" + i + "/data", null, getSolrConfigFile(),
getSchemaFile());
jettys.add(j);

View File

@ -135,6 +135,9 @@ public class CloudState implements JSONWriter.Writable {
return Collections.unmodifiableSet(collectionStates.keySet());
}
/**
* @return Map<collectionName, Map<sliceName,Slice>>
*/
public Map<String, Map<String, Slice>> getCollectionStates() {
return Collections.unmodifiableMap(collectionStates);
}

View File

@ -680,4 +680,22 @@ public class SolrZkClient {
return keeper;
}
// yeah, it's recursive :(
public void clean(String path) throws InterruptedException, KeeperException {
List<String> children;
try {
children = getChildren(path, null, true);
} catch (NoNodeException r) {
return;
}
for (String string : children) {
clean(path + "/" + string);
}
try {
delete(path, -1, true);
} catch (NoNodeException r) {
return;
}
}
}

View File

@ -0,0 +1,44 @@
package org.apache.solr.common.params;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Locale;
public interface CollectionParams
{
/** What action **/
public final static String ACTION = "action";
public final static String NAME = "name";
public enum CollectionAction {
CREATE, DELETE;
public static CollectionAction get( String p )
{
if( p != null ) {
try {
return CollectionAction.valueOf( p.toUpperCase(Locale.ENGLISH) );
}
catch( Exception ex ) {}
}
return null;
}
}
}

View File

@ -77,6 +77,10 @@ public interface CoreAdminParams
/** If you unload a core, delete the index too */
public final static String DELETE_INDEX = "deleteIndex";
public static final String DELETE_DATA_DIR = "deleteDataDir";
public static final String DELETE_INSTANCE_DIR = "deleteInstanceDir";
public enum CoreAdminAction {
STATUS,
@ -89,8 +93,7 @@ public interface CoreAdminParams
RENAME,
MERGEINDEXES,
PREPRECOVERY,
REQUESTRECOVERY,
DISTRIBURL;
REQUESTRECOVERY;
public static CoreAdminAction get( String p )
{

View File

@ -31,7 +31,6 @@ import java.util.Random;
import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.lucene.search.FieldCache;
import org.apache.solr.client.solrj.SolrServer;
@ -189,7 +188,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
}
protected void createServers(int numShards) throws Exception {
controlJetty = createJetty(testDir, testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlJetty = createJetty(new File(getSolrHome()), testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlClient = createNewSolrServer(controlJetty.getLocalPort());
@ -197,7 +196,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numShards; i++) {
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir,
JettySolrRunner j = createJetty(new File(getSolrHome()),
testDir + "/shard" + i + "/data", null, getSolrConfigFile(),
getSchemaFile());
jettys.add(j);
@ -247,17 +246,17 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
jettys.clear();
}
public JettySolrRunner createJetty(File baseDir, String dataDir) throws Exception {
return createJetty(baseDir, dataDir, null, null, null);
public JettySolrRunner createJetty(File solrHome, String dataDir) throws Exception {
return createJetty(solrHome, dataDir, null, null, null);
}
public JettySolrRunner createJetty(File baseDir, String dataDir, String shardId) throws Exception {
return createJetty(baseDir, dataDir, shardId, null, null);
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardId) throws Exception {
return createJetty(solrHome, dataDir, shardId, null, null);
}
public JettySolrRunner createJetty(File baseDir, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
JettySolrRunner jetty = new JettySolrRunner(getSolrHome(), "/solr", 0, solrConfigOverride, schemaOverride);
JettySolrRunner jetty = new JettySolrRunner(solrHome.getAbsolutePath(), "/solr", 0, solrConfigOverride, schemaOverride);
jetty.setShards(shardList);
jetty.setDataDir(dataDir);
jetty.start();

View File

@ -1367,6 +1367,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
}
// TODO: use solr rather than solr/collection1
public static String TEST_HOME() {
return getFile("solr/collection1").getParent();
}