SOLR-8487: Adds CommitStream to support sending commits to a collection being updated

This commit is contained in:
Dennis Gove 2016-09-13 10:50:52 -04:00
parent f391d57075
commit 6365920a0e
6 changed files with 641 additions and 2 deletions

View File

@ -90,6 +90,8 @@ New Features
* SOLR-8186: Reduce logging to logs/solr-<port>-console.log when not running in foreground mode * SOLR-8186: Reduce logging to logs/solr-<port>-console.log when not running in foreground mode
Show timestamp also in foreground log. Also removes some logging noise. (janhoy) Show timestamp also in foreground log. Also removes some logging noise. (janhoy)
* SOLR-8487: Adds CommitStream to support sending commits to a collection being updated. (Dennis Gove)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -105,6 +105,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("update", UpdateStream.class) .withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class) .withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("topic", TopicStream.class) .withFunctionName("topic", TopicStream.class)
.withFunctionName("commit", CommitStream.class)
// decorator streams // decorator streams
.withFunctionName("merge", MergeStream.class) .withFunctionName("merge", MergeStream.class)
@ -128,7 +129,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("shortestPath", ShortestPathStream.class) .withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class) .withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("select", SelectStream.class) .withFunctionName("select", SelectStream.class)
.withFunctionName("scoreNodes", ScoreNodesStream.class) .withFunctionName("scoreNodes", ScoreNodesStream.class)
// metrics // metrics
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)

View File

@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends a commit message to a SolrCloud collection
*/
public class CommitStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Part of expression / passed in
private String collection;
private String zkHost;
private boolean waitFlush;
private boolean waitSearcher;
private boolean softCommit;
private int commitBatchSize;
private TupleStream tupleSource;
private transient SolrClientCache clientCache;
private long docsSinceCommit;
public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
String zkHost = findZkHost(factory, collectionName, expression);
int batchSize = factory.getIntOperand(expression, "batchSize", 0);
boolean waitFlush = factory.getBooleanOperand(expression, "waitFlush", false);
boolean waitSearcher = factory.getBooleanOperand(expression, "waitSearcher", false);
boolean softCommit = factory.getBooleanOperand(expression, "softCommit", false);
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
if(batchSize < 0){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize cannot be less than 0 but is '%d'",expression,batchSize));
}
//Extract underlying TupleStream.
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
if (1 != streamExpressions.size()) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
StreamExpression sourceStreamExpression = streamExpressions.get(0);
init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, batchSize, waitFlush, waitSearcher, softCommit);
}
public CommitStream(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) throws IOException {
if (batchSize < 0) {
throw new IOException(String.format(Locale.ROOT,"batchSize '%d' cannot be less than 0.", batchSize));
}
init(collectionName, tupleSource, zkHost, batchSize, waitFlush, waitSearcher, softCommit);
}
private void init(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) {
this.collection = collectionName;
this.zkHost = zkHost;
this.commitBatchSize = batchSize;
this.waitFlush = waitFlush;
this.waitSearcher = waitSearcher;
this.softCommit = softCommit;
this.tupleSource = tupleSource;
}
@Override
public void open() throws IOException {
tupleSource.open();
clientCache = new SolrClientCache();
docsSinceCommit = 0;
}
@Override
public Tuple read() throws IOException {
Tuple tuple = tupleSource.read();
if(tuple.EOF){
if(docsSinceCommit > 0){
sendCommit();
}
}
else{
// if the read document contains field 'batchIndexed' then it's a summary
// document and we can update our count based on it's value. If not then
// just increment by 1
if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){
docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME));
}
else{
docsSinceCommit += 1;
}
if(commitBatchSize > 0 && docsSinceCommit >= commitBatchSize){
// if commitBatchSize == 0 then the tuple.EOF above will end up calling sendCommit()
sendCommit();
}
}
return tuple;
}
private boolean isInteger(String string){
try{
Integer.parseInt(string);
return true;
}
catch(NumberFormatException e){
return false;
}
}
@Override
public void close() throws IOException {
clientCache.close();
tupleSource.close();
}
@Override
public StreamComparator getStreamSort() {
return tupleSource.getStreamSort();
}
@Override
public List<TupleStream> children() {
ArrayList<TupleStream> sourceList = new ArrayList<TupleStream>(1);
sourceList.add(tupleSource);
return sourceList;
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(collection);
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(commitBatchSize)));
expression.addParameter(new StreamExpressionNamedParameter("waitFlush", Boolean.toString(waitFlush)));
expression.addParameter(new StreamExpressionNamedParameter("waitSearcher", Boolean.toString(waitSearcher)));
expression.addParameter(new StreamExpressionNamedParameter("softCommit", Boolean.toString(softCommit)));
if(includeStreams){
if(tupleSource instanceof Expressible){
expression.addParameter(((Expressible)tupleSource).toExpression(factory));
} else {
throw new IOException("This CommitStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
// A commit stream is backward wrt the order in the explanation. This stream is the "child"
// while the collection we're committing to is the parent.
StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore");
explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
explanation.setImplementingClass("Solr/Lucene");
explanation.setExpressionType(ExpressionType.DATASTORE);
explanation.setExpression("Commit into " + collection);
// child is a stream so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId().toString());
child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass())));
child.setImplementingClass(getClass().getName());
child.setExpressionType(ExpressionType.STREAM_DECORATOR);
child.setExpression(toExpression(factory, false).toString());
child.addChild(tupleSource.toExplanation(factory));
explanation.addChild(child);
return explanation;
}
@Override
public void setStreamContext(StreamContext context) {
if(null != context.getSolrClientCache()){
this.clientCache = context.getSolrClientCache();
// this overrides the one created in open
}
this.tupleSource.setStreamContext(context);
}
private String findZkHost(StreamFactory factory, String collectionName, StreamExpression expression) {
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
if(null == zkHostExpression){
String zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
return factory.getDefaultZkHost();
} else {
return zkHost;
}
} else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
return ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
return null;
}
private void sendCommit() throws IOException {
try {
clientCache.getCloudSolrClient(zkHost).commit(collection, waitFlush, waitSearcher, softCommit);
} catch (SolrServerException | IOException e) {
LOG.warn(String.format(Locale.ROOT, "Unable to commit documents to collection '%s' due to unexpected error.", collection), e);
String className = e.getClass().getName();
String message = e.getMessage();
throw new IOException(String.format(Locale.ROOT,"Unexpected error when committing documents to collection %s- %s:%s", collection, className, message));
}
}
}

View File

@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
public class UpdateStream extends TupleStream implements Expressible { public class UpdateStream extends TupleStream implements Expressible {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed"; // field name in summary tuple for #docs updated in batch
private String collection; private String collection;
private String zkHost; private String zkHost;
private int updateBatchSize; private int updateBatchSize;
@ -307,7 +308,7 @@ public class UpdateStream extends TupleStream implements Expressible {
Map m = new HashMap(); Map m = new HashMap();
this.totalDocsIndex += batchSize; this.totalDocsIndex += batchSize;
++batchNumber; ++batchNumber;
m.put("batchIndexed", batchSize); m.put(BATCH_INDEXED_FIELD_NAME, batchSize);
m.put("totalIndexed", this.totalDocsIndex); m.put("totalIndexed", this.totalDocsIndex);
m.put("batchNumber", batchNumber); m.put("batchNumber", batchNumber);
if(coreName != null) { if(coreName != null) {

View File

@ -174,6 +174,41 @@ public class StreamFactory implements Serializable {
return matchingStreamExpressions; return matchingStreamExpressions;
} }
public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{
StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
if(null != defaultValue){
return defaultValue;
}
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName));
}
String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
try{
return Integer.parseInt(nStr);
}
catch(NumberFormatException e){
if(null != defaultValue){
return defaultValue;
}
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr));
}
}
public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{
StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
if(null != defaultValue){
return defaultValue;
}
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName));
}
String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
return Boolean.parseBoolean(nStr);
}
public TupleStream constructStream(String expressionClause) throws IOException { public TupleStream constructStream(String expressionClause) throws IOException {
return constructStream(StreamExpressionParser.parse(expressionClause)); return constructStream(StreamExpressionParser.parse(expressionClause));
} }

View File

@ -3034,6 +3034,346 @@ public class StreamExpressionTest extends SolrCloudTestCase {
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
} }
////////////////////////////////////////////
@Test
public void testCommitStream() throws Exception {
CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection1");
StreamExpression expression;
TupleStream stream;
Tuple t;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("commit", CommitStream.class);
//Copy all docs to destinationCollection
expression = StreamExpressionParser.parse("commit(destinationCollection, batchSize=2, update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\")))");
stream = factory.constructStream(expression);
List<Tuple> tuples = getTuples(stream);
//Ensure that all CommitStream tuples indicate the correct number of copied/indexed docs
assert(tuples.size() == 1);
t = tuples.get(0);
assert(t.EOF == false);
assertEquals(5, t.get("batchIndexed"));
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(destinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient());
}
@Test
public void testParallelCommitStream() throws Exception {
CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection1");
StreamExpression expression;
TupleStream stream;
Tuple t;
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withCollectionZkHost("parallelDestinationCollection", cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("commit", CommitStream.class)
.withFunctionName("parallel", ParallelStream.class);
//Copy all docs to destinationCollection
String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
List<Tuple> tuples = getTuples(parallelUpdateStream);
//Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs
long count = 0;
for(Tuple tuple : tuples) {
count+=tuple.getLong("batchIndexed");
}
assert(count == 5);
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(parallelDestinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
CollectionAdminRequest.deleteCollection("parallelDestinationCollection").process(cluster.getSolrClient());
}
@Test
public void testParallelDaemonCommitStream() throws Exception {
CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection1");
StreamExpression expression;
TupleStream stream;
Tuple t;
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("commit", CommitStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("daemon", DaemonStream.class);
//Copy all docs to destinationCollection
String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
List<Tuple> tuples = getTuples(parallelUpdateStream);
assert(tuples.size() == 2);
//Lets sleep long enough for daemon updates to run.
//Lets stop the daemons
ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
int workersComplete = 0;
for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
int iterations = 0;
INNER:
while(iterations == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
solrStream.close();
break INNER;
} else {
long l = tupleResponse.getLong("iterations");
if(l > 0) {
++workersComplete;
} else {
try {
Thread.sleep(1000);
} catch(Exception e) {
}
}
iterations = (int) l;
solrStream.close();
}
}
}
assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
//Lets stop the daemons
sParams = new ModifiableSolrParams();
sParams.set(CommonParams.QT, "/stream");
sParams.set("action", "stop");
sParams.set("id", "test");
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
solrStream.close();
}
sParams = new ModifiableSolrParams();
sParams.set(CommonParams.QT, "/stream");
sParams.set("action", "list");
workersComplete = 0;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
long stopTime = 0;
INNER:
while(stopTime == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
solrStream.close();
break INNER;
} else {
stopTime = tupleResponse.getLong("stopTime");
if (stopTime > 0) {
++workersComplete;
} else {
try {
Thread.sleep(1000);
} catch(Exception e) {
}
}
solrStream.close();
}
}
}
assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
}
////////////////////////////////////////////
@Test @Test
public void testIntersectStream() throws Exception { public void testIntersectStream() throws Exception {