mirror of https://github.com/apache/lucene.git
SOLR-11392: Change collection names in test case
This commit is contained in:
parent
1782dd9ca9
commit
070d6d3748
|
@ -7556,14 +7556,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
@Test
|
||||
public void testParallelExecutorStream() throws Exception {
|
||||
CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue", cluster.getSolrClient().getZkStateReader(),
|
||||
CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("workQueue1", cluster.getSolrClient().getZkStateReader(),
|
||||
false, true, TIMEOUT);
|
||||
CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus", cluster.getSolrClient().getZkStateReader(),
|
||||
CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("mainCorpus1", cluster.getSolrClient().getZkStateReader(),
|
||||
false, true, TIMEOUT);
|
||||
CollectionAdminRequest.createCollection("destination", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination", cluster.getSolrClient().getZkStateReader(),
|
||||
CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish("destination1", cluster.getSolrClient().getZkStateReader(),
|
||||
false, true, TIMEOUT);
|
||||
|
||||
UpdateRequest workRequest = new UpdateRequest();
|
||||
|
@ -7571,27 +7571,27 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
workRequest.add(id, String.valueOf(i), "expr_s", "update(destination, batchSize=50, search(mainCorpus, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
|
||||
workRequest.add(id, String.valueOf(i), "expr_s", "update(destination1, batchSize=50, search(mainCorpus1, q=id:"+i+", rows=1, sort=\"id asc\", fl=\"id, body_t, field_i\"))");
|
||||
dataRequest.add(id, String.valueOf(i), "body_t", "hello world "+i, "field_i", Integer.toString(i));
|
||||
}
|
||||
|
||||
workRequest.commit(cluster.getSolrClient(), "workQueue");
|
||||
dataRequest.commit(cluster.getSolrClient(), "mainCorpus");
|
||||
workRequest.commit(cluster.getSolrClient(), "workQueue1");
|
||||
dataRequest.commit(cluster.getSolrClient(), "mainCorpus1");
|
||||
|
||||
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination";
|
||||
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/destination1";
|
||||
TupleStream executorStream;
|
||||
ModifiableSolrParams paramsLoc;
|
||||
|
||||
StreamFactory factory = new StreamFactory()
|
||||
.withCollectionZkHost("workQueue", cluster.getZkServer().getZkAddress())
|
||||
.withCollectionZkHost("mainCorpus", cluster.getZkServer().getZkAddress())
|
||||
.withCollectionZkHost("destination", cluster.getZkServer().getZkAddress())
|
||||
.withCollectionZkHost("workQueue1", cluster.getZkServer().getZkAddress())
|
||||
.withCollectionZkHost("mainCorpus1", cluster.getZkServer().getZkAddress())
|
||||
.withCollectionZkHost("destination1", cluster.getZkServer().getZkAddress())
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
.withFunctionName("executor", ExecutorStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("update", UpdateStream.class);
|
||||
|
||||
String executorExpression = "parallel(workQueue, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
|
||||
String executorExpression = "parallel(workQueue1, workers=2, sort=\"EOF asc\", executor(threads=3, queueSize=100, search(workQueue1, q=\"*:*\", fl=\"id, expr_s\", rows=1000, partitionKeys=id, sort=\"id desc\")))";
|
||||
executorStream = factory.constructStream(executorExpression);
|
||||
|
||||
StreamContext context = new StreamContext();
|
||||
|
@ -7600,9 +7600,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
executorStream.setStreamContext(context);
|
||||
getTuples(executorStream);
|
||||
//Destination collection should now contain all the records in the main corpus.
|
||||
cluster.getSolrClient().commit("destination");
|
||||
cluster.getSolrClient().commit("destination1");
|
||||
paramsLoc = new ModifiableSolrParams();
|
||||
paramsLoc.set("expr", "search(destination, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
|
||||
paramsLoc.set("expr", "search(destination1, q=\"*:*\", fl=\"id, body_t, field_i\", rows=1000, sort=\"field_i asc\")");
|
||||
paramsLoc.set("qt", "/stream");
|
||||
|
||||
SolrStream solrStream = new SolrStream(url, paramsLoc);
|
||||
|
@ -7618,9 +7618,9 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
solrStream.close();
|
||||
clientCache.close();
|
||||
CollectionAdminRequest.deleteCollection("workQueue").process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.deleteCollection("mainCorpus").process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.deleteCollection("destination").process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.deleteCollection("workQueue1").process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.deleteCollection("mainCorpus1").process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.deleteCollection("destination1").process(cluster.getSolrClient());
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue