mirror of https://github.com/apache/lucene.git
SOLR-4140: Allow access to the collections API through CloudSolrServer without referencing an existing collection.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1421071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
930354533e
commit
38d16dc7b7
|
@ -143,6 +143,9 @@ New Features
|
||||||
* SOLR-4166: LBHttpSolrServer ignores ResponseParser passed in constructor.
|
* SOLR-4166: LBHttpSolrServer ignores ResponseParser passed in constructor.
|
||||||
(Steve Molloy via Mark Miller)
|
(Steve Molloy via Mark Miller)
|
||||||
|
|
||||||
|
* SOLR-4140: Allow access to the collections API through CloudSolrServer
|
||||||
|
without referencing an existing collection. (Per Steffensen via Mark Miller)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -748,8 +748,37 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||||
for (int i = 0; i < cnt; i++) {
|
for (int i = 0; i < cnt; i++) {
|
||||||
int numShards = _TestUtil.nextInt(random(), 0, shardCount) + 1;
|
int numShards = _TestUtil.nextInt(random(), 0, shardCount) + 1;
|
||||||
int replicationFactor = _TestUtil.nextInt(random(), 0, 3) + 2;
|
int replicationFactor = _TestUtil.nextInt(random(), 0, 3) + 2;
|
||||||
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer().getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer()
|
||||||
createCollection(collectionInfos, i, numShards, replicationFactor, maxShardsPerNode);
|
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||||
|
|
||||||
|
|
||||||
|
CloudSolrServer client = null;
|
||||||
|
try {
|
||||||
|
if (i == 0) {
|
||||||
|
// Test if we can create a collection through CloudSolrServer where
|
||||||
|
// you havnt set default-collection
|
||||||
|
// This is nice because you want to be able to create you first
|
||||||
|
// collection using CloudSolrServer, and in such case there is
|
||||||
|
// nothing reasonable to set as default-collection
|
||||||
|
client = createCloudClient(null);
|
||||||
|
} else if (i == 1) {
|
||||||
|
// Test if we can create a collection through CloudSolrServer where
|
||||||
|
// you have set default-collection to a non-existing collection
|
||||||
|
// This is nice because you want to be able to create you first
|
||||||
|
// collection using CloudSolrServer, and in such case there is
|
||||||
|
// nothing reasonable to set as default-collection, but you might want
|
||||||
|
// to use the same CloudSolrServer throughout the entire
|
||||||
|
// lifetime of your client-application, so it is nice to be able to
|
||||||
|
// set a default-collection on this CloudSolrServer once and for all
|
||||||
|
// and use this CloudSolrServer to create the collection
|
||||||
|
client = createCloudClient("awholynewcollection_" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
createCollection(collectionInfos, "awholynewcollection_" + i,
|
||||||
|
numShards, replicationFactor, maxShardsPerNode, client);
|
||||||
|
} finally {
|
||||||
|
if (client != null) client.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
|
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
|
||||||
|
@ -882,7 +911,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||||
int replicationFactor = 2;
|
int replicationFactor = 2;
|
||||||
int maxShardsPerNode = 1;
|
int maxShardsPerNode = 1;
|
||||||
collectionInfos = new HashMap<String,List<Integer>>();
|
collectionInfos = new HashMap<String,List<Integer>>();
|
||||||
createCollection(collectionInfos, cnt, numShards, replicationFactor, maxShardsPerNode);
|
CloudSolrServer client = createCloudClient("awholynewcollection_" + cnt);
|
||||||
|
try {
|
||||||
|
createCollection(collectionInfos, "awholynewcollection_" + cnt, numShards, replicationFactor, maxShardsPerNode, client);
|
||||||
|
} finally {
|
||||||
|
client.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: REMOVE THE SLEEP IN THE METHOD CALL WHEN WE HAVE COLLECTION API
|
// TODO: REMOVE THE SLEEP IN THE METHOD CALL WHEN WE HAVE COLLECTION API
|
||||||
// RESPONSES
|
// RESPONSES
|
||||||
|
@ -893,30 +927,34 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
||||||
|
|
||||||
|
|
||||||
protected void createCollection(Map<String,List<Integer>> collectionInfos,
|
protected void createCollection(Map<String,List<Integer>> collectionInfos,
|
||||||
int i, int numShards, int numReplica, int maxShardsPerNode) throws SolrServerException, IOException {
|
String collectionName, int numShards, int numReplicas, int maxShardsPerNode, SolrServer client) throws SolrServerException, IOException {
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
params.set("action", CollectionAction.CREATE.toString());
|
params.set("action", CollectionAction.CREATE.toString());
|
||||||
|
|
||||||
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
|
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
|
||||||
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplica);
|
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
|
||||||
params.set(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
|
params.set(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
|
||||||
String collectionName = "awholynewcollection_" + i;
|
|
||||||
int clientIndex = random().nextInt(2);
|
int clientIndex = random().nextInt(2);
|
||||||
List<Integer> list = new ArrayList<Integer>();
|
List<Integer> list = new ArrayList<Integer>();
|
||||||
list.add(numShards);
|
list.add(numShards);
|
||||||
list.add(numReplica);
|
list.add(numReplicas);
|
||||||
list.add(maxShardsPerNode);
|
list.add(maxShardsPerNode);
|
||||||
collectionInfos.put(collectionName, list);
|
collectionInfos.put(collectionName, list);
|
||||||
params.set("name", collectionName);
|
params.set("name", collectionName);
|
||||||
SolrRequest request = new QueryRequest(params);
|
SolrRequest request = new QueryRequest(params);
|
||||||
request.setPath("/admin/collections");
|
request.setPath("/admin/collections");
|
||||||
|
|
||||||
final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring(
|
if (client == null) {
|
||||||
0,
|
final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring(
|
||||||
((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length()
|
0,
|
||||||
- DEFAULT_COLLECTION.length() - 1);
|
((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length()
|
||||||
|
- DEFAULT_COLLECTION.length() - 1);
|
||||||
createNewSolrServer("", baseUrl).request(request);
|
|
||||||
|
createNewSolrServer("", baseUrl).request(request);
|
||||||
|
} else {
|
||||||
|
client.request(request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean waitForReloads(String collectionName, Map<String,Long> urlToTimeBefore) throws SolrServerException, IOException {
|
private boolean waitForReloads(String collectionName, Map<String,Long> urlToTimeBefore) throws SolrServerException, IOException {
|
||||||
|
|
|
@ -163,9 +163,10 @@ public class CloudSolrServer extends SolrServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
|
public NamedList<Object> request(SolrRequest request)
|
||||||
|
throws SolrServerException, IOException {
|
||||||
connect();
|
connect();
|
||||||
|
|
||||||
// TODO: if you can hash here, you could favor the shard leader
|
// TODO: if you can hash here, you could favor the shard leader
|
||||||
|
|
||||||
ClusterState clusterState = zkStateReader.getClusterState();
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
@ -176,95 +177,111 @@ public class CloudSolrServer extends SolrServer {
|
||||||
sendToLeaders = true;
|
sendToLeaders = true;
|
||||||
replicas = new ArrayList<String>();
|
replicas = new ArrayList<String>();
|
||||||
}
|
}
|
||||||
|
|
||||||
SolrParams reqParams = request.getParams();
|
SolrParams reqParams = request.getParams();
|
||||||
if (reqParams == null) {
|
if (reqParams == null) {
|
||||||
reqParams = new ModifiableSolrParams();
|
reqParams = new ModifiableSolrParams();
|
||||||
}
|
}
|
||||||
String collection = reqParams.get("collection", defaultCollection);
|
List<String> theUrlList = new ArrayList<String>();
|
||||||
|
if (request.getPath().equals("/admin/collections")) {
|
||||||
if (collection == null) {
|
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||||
throw new SolrServerException("No collection param specified on request and no default collection has been set.");
|
for (String liveNode : liveNodes) {
|
||||||
}
|
int splitPointBetweenHostPortAndContext = liveNode.indexOf("_");
|
||||||
|
theUrlList.add("http://"
|
||||||
// Extract each comma separated collection name and store in a List.
|
+ liveNode.substring(0, splitPointBetweenHostPortAndContext) + "/"
|
||||||
List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
|
+ liveNode.substring(splitPointBetweenHostPortAndContext + 1));
|
||||||
|
}
|
||||||
// TODO: not a big deal because of the caching, but we could avoid looking at every shard
|
} else {
|
||||||
// when getting leaders if we tweaked some things
|
String collection = reqParams.get("collection", defaultCollection);
|
||||||
|
|
||||||
// Retrieve slices from the cloud state and, for each collection specified,
|
if (collection == null) {
|
||||||
// add it to the Map of slices.
|
throw new SolrServerException(
|
||||||
Map<String,Slice> slices = new HashMap<String,Slice>();
|
"No collection param specified on request and no default collection has been set.");
|
||||||
for (String collectionName : collectionList) {
|
}
|
||||||
ClientUtils.addSlices(slices, collectionName, clusterState.getSlices(collectionName), true);
|
|
||||||
}
|
// Extract each comma separated collection name and store in a List.
|
||||||
|
List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
|
||||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
|
||||||
|
// TODO: not a big deal because of the caching, but we could avoid looking
|
||||||
List<String> theUrlList;
|
// at every shard
|
||||||
synchronized (cachLock) {
|
// when getting leaders if we tweaked some things
|
||||||
List<String> leaderUrlList = leaderUrlLists.get(collection);
|
|
||||||
List<String> urlList = urlLists.get(collection);
|
// Retrieve slices from the cloud state and, for each collection
|
||||||
List<String> replicasList = replicasLists.get(collection);
|
// specified,
|
||||||
|
// add it to the Map of slices.
|
||||||
if ((sendToLeaders && leaderUrlList == null) || (!sendToLeaders
|
Map<String,Slice> slices = new HashMap<String,Slice>();
|
||||||
&& urlList == null)
|
for (String collectionName : collectionList) {
|
||||||
|| clusterState.hashCode() != this.lastClusterStateHashCode) {
|
ClientUtils.addSlices(slices, collectionName,
|
||||||
// build a map of unique nodes
|
clusterState.getSlices(collectionName), true);
|
||||||
// TODO: allow filtering by group, role, etc
|
}
|
||||||
Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
|
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||||
List<String> urlList2 = new ArrayList<String>();
|
|
||||||
for (Slice slice : slices.values()) {
|
synchronized (cachLock) {
|
||||||
for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
|
List<String> leaderUrlList = leaderUrlLists.get(collection);
|
||||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
List<String> urlList = urlLists.get(collection);
|
||||||
String node = coreNodeProps.getNodeName();
|
List<String> replicasList = replicasLists.get(collection);
|
||||||
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
|
||||||
|| !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
|
if ((sendToLeaders && leaderUrlList == null)
|
||||||
if (nodes.put(node, nodeProps) == null) {
|
|| (!sendToLeaders && urlList == null)
|
||||||
if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
|
|| clusterState.hashCode() != this.lastClusterStateHashCode) {
|
||||||
String url = coreNodeProps.getCoreUrl();
|
// build a map of unique nodes
|
||||||
urlList2.add(url);
|
// TODO: allow filtering by group, role, etc
|
||||||
} else if (sendToLeaders) {
|
Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
|
||||||
String url = coreNodeProps.getCoreUrl();
|
List<String> urlList2 = new ArrayList<String>();
|
||||||
replicas.add(url);
|
for (Slice slice : slices.values()) {
|
||||||
|
for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
|
||||||
|
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||||
|
String node = coreNodeProps.getNodeName();
|
||||||
|
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
||||||
|
|| !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
|
||||||
|
if (nodes.put(node, nodeProps) == null) {
|
||||||
|
if (!sendToLeaders
|
||||||
|
|| (sendToLeaders && coreNodeProps.isLeader())) {
|
||||||
|
String url = coreNodeProps.getCoreUrl();
|
||||||
|
urlList2.add(url);
|
||||||
|
} else if (sendToLeaders) {
|
||||||
|
String url = coreNodeProps.getCoreUrl();
|
||||||
|
replicas.add(url);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sendToLeaders) {
|
||||||
|
this.leaderUrlLists.put(collection, urlList2);
|
||||||
|
leaderUrlList = urlList2;
|
||||||
|
this.replicasLists.put(collection, replicas);
|
||||||
|
replicasList = replicas;
|
||||||
|
} else {
|
||||||
|
this.urlLists.put(collection, urlList2);
|
||||||
|
urlList = urlList2;
|
||||||
|
}
|
||||||
|
this.lastClusterStateHashCode = clusterState.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sendToLeaders) {
|
if (sendToLeaders) {
|
||||||
this.leaderUrlLists.put(collection, urlList2);
|
theUrlList = new ArrayList<String>(leaderUrlList.size());
|
||||||
leaderUrlList = urlList2;
|
theUrlList.addAll(leaderUrlList);
|
||||||
this.replicasLists.put(collection, replicas);
|
|
||||||
replicasList = replicas;
|
|
||||||
} else {
|
} else {
|
||||||
this.urlLists.put(collection, urlList2);
|
theUrlList = new ArrayList<String>(urlList.size());
|
||||||
urlList = urlList2;
|
theUrlList.addAll(urlList);
|
||||||
|
}
|
||||||
|
Collections.shuffle(theUrlList, rand);
|
||||||
|
if (sendToLeaders) {
|
||||||
|
ArrayList<String> theReplicas = new ArrayList<String>(
|
||||||
|
replicasList.size());
|
||||||
|
theReplicas.addAll(replicasList);
|
||||||
|
Collections.shuffle(theReplicas, rand);
|
||||||
|
// System.out.println("leaders:" + theUrlList);
|
||||||
|
// System.out.println("replicas:" + theReplicas);
|
||||||
|
theUrlList.addAll(theReplicas);
|
||||||
}
|
}
|
||||||
this.lastClusterStateHashCode = clusterState.hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sendToLeaders) {
|
|
||||||
theUrlList = new ArrayList<String>(leaderUrlList.size());
|
|
||||||
theUrlList.addAll(leaderUrlList);
|
|
||||||
} else {
|
|
||||||
theUrlList = new ArrayList<String>(urlList.size());
|
|
||||||
theUrlList.addAll(urlList);
|
|
||||||
}
|
|
||||||
Collections.shuffle(theUrlList, rand);
|
|
||||||
if (sendToLeaders) {
|
|
||||||
ArrayList<String> theReplicas = new ArrayList<String>(
|
|
||||||
replicasList.size());
|
|
||||||
theReplicas.addAll(replicasList);
|
|
||||||
Collections.shuffle(theReplicas, rand);
|
|
||||||
// System.out.println("leaders:" + theUrlList);
|
|
||||||
// System.out.println("replicas:" + theReplicas);
|
|
||||||
theUrlList.addAll(theReplicas);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// System.out.println("########################## MAKING REQUEST TO " + theUrlList);
|
// System.out.println("########################## MAKING REQUEST TO " +
|
||||||
|
// theUrlList);
|
||||||
|
|
||||||
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList);
|
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList);
|
||||||
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
|
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
|
||||||
return rsp.getResponse();
|
return rsp.getResponse();
|
||||||
|
|
|
@ -199,13 +199,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||||
assert(cloudInit == false);
|
assert(cloudInit == false);
|
||||||
cloudInit = true;
|
cloudInit = true;
|
||||||
try {
|
try {
|
||||||
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
|
cloudClient = createCloudClient(DEFAULT_COLLECTION);
|
||||||
server.setDefaultCollection(DEFAULT_COLLECTION);
|
|
||||||
server.getLbServer().getHttpClient().getParams()
|
|
||||||
.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
|
|
||||||
server.getLbServer().getHttpClient().getParams()
|
|
||||||
.setParameter(CoreConnectionPNames.SO_TIMEOUT, 20000);
|
|
||||||
cloudClient = server;
|
|
||||||
|
|
||||||
cloudClient.connect();
|
cloudClient.connect();
|
||||||
} catch (MalformedURLException e) {
|
} catch (MalformedURLException e) {
|
||||||
|
@ -217,7 +211,17 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||||
chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION,
|
chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION,
|
||||||
shardToJetty, shardToLeaderJetty);
|
shardToJetty, shardToLeaderJetty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected CloudSolrServer createCloudClient(String defaultCollection)
|
||||||
|
throws MalformedURLException {
|
||||||
|
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
|
||||||
|
if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
|
||||||
|
server.getLbServer().getHttpClient().getParams()
|
||||||
|
.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
|
||||||
|
server.getLbServer().getHttpClient().getParams()
|
||||||
|
.setParameter(CoreConnectionPNames.SO_TIMEOUT, 20000);
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createServers(int numServers) throws Exception {
|
protected void createServers(int numServers) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue