From edde433594c104668137350d9db640180b04f648 Mon Sep 17 00:00:00 2001 From: Dennis Gove Date: Fri, 14 Oct 2016 11:43:13 -0400 Subject: [PATCH] SOLR-8487: Adds CommitStream to support sending commits to a collection being updated --- solr/CHANGES.txt | 2 + .../apache/solr/handler/StreamHandler.java | 2 + .../client/solrj/io/stream/CommitStream.java | 260 ++++++++++++++ .../client/solrj/io/stream/UpdateStream.java | 3 +- .../solrj/io/stream/expr/StreamFactory.java | 35 ++ .../solrj/io/stream/StreamExpressionTest.java | 340 ++++++++++++++++++ 6 files changed, 641 insertions(+), 1 deletion(-) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 90205a66cd8..d500d15cbf7 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -88,6 +88,8 @@ New Features * SOLR-9103: Restore ability for users to add custom Streaming Expressions (Cao Manh Dat) +* SOLR-8487: Adds CommitStream to support sending commits to a collection being updated (Dennis Gove) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 9fc8adee323..ee28598ce8b 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; +import org.apache.solr.client.solrj.io.stream.CommitStream; import org.apache.solr.client.solrj.io.stream.ComplementStream; import org.apache.solr.client.solrj.io.stream.DaemonStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; @@ -137,6 +138,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("update", UpdateStream.class) .withFunctionName("jdbc", JDBCStream.class) .withFunctionName("topic", TopicStream.class) + .withFunctionName("commit", CommitStream.class) // decorator streams .withFunctionName("merge", MergeStream.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java new file mode 100644 index 00000000000..c075978a817 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends a commit message to a SolrCloud collection + */ +public class CommitStream extends TupleStream implements Expressible { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // Part of expression / passed in + private String collection; + private String zkHost; + private boolean waitFlush; + private boolean waitSearcher; + private boolean softCommit; + private int commitBatchSize; + private TupleStream tupleSource; + + private transient SolrClientCache clientCache; + private long docsSinceCommit; + + public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException { + + String collectionName = factory.getValueOperand(expression, 0); + String zkHost = findZkHost(factory, collectionName, expression); + int batchSize = factory.getIntOperand(expression, "batchSize", 0); + boolean waitFlush = factory.getBooleanOperand(expression, "waitFlush", false); + boolean waitSearcher = factory.getBooleanOperand(expression, "waitSearcher", false); + boolean softCommit = factory.getBooleanOperand(expression, "softCommit", false); + + if(null == collectionName){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + if(batchSize < 0){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize cannot be less than 0 but is '%d'",expression,batchSize)); + } + + //Extract underlying TupleStream. + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + if (1 != streamExpressions.size()) { + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); + } + StreamExpression sourceStreamExpression = streamExpressions.get(0); + + init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, batchSize, waitFlush, waitSearcher, softCommit); + } + + public CommitStream(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) throws IOException { + if (batchSize < 0) { + throw new IOException(String.format(Locale.ROOT,"batchSize '%d' cannot be less than 0.", batchSize)); + } + init(collectionName, tupleSource, zkHost, batchSize, waitFlush, waitSearcher, softCommit); + } + + private void init(String collectionName, TupleStream tupleSource, String zkHost, int batchSize, boolean waitFlush, boolean waitSearcher, boolean softCommit) { + this.collection = collectionName; + this.zkHost = zkHost; + this.commitBatchSize = batchSize; + this.waitFlush = waitFlush; + this.waitSearcher = waitSearcher; + this.softCommit = softCommit; + this.tupleSource = tupleSource; + } + + @Override + public void open() throws IOException { + tupleSource.open(); + clientCache = new SolrClientCache(); + docsSinceCommit = 0; + } + + @Override + public Tuple read() throws IOException { + + Tuple tuple = tupleSource.read(); + if(tuple.EOF){ + if(docsSinceCommit > 0){ + sendCommit(); + } + } + else{ + // if the read document contains field 'batchIndexed' then it's a summary + // document and we can update our count based on it's value. If not then + // just increment by 1 + if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){ + docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME)); + } + else{ + docsSinceCommit += 1; + } + + if(commitBatchSize > 0 && docsSinceCommit >= commitBatchSize){ + // if commitBatchSize == 0 then the tuple.EOF above will end up calling sendCommit() + sendCommit(); + } + } + + return tuple; + } + + private boolean isInteger(String string){ + try{ + Integer.parseInt(string); + return true; + } + catch(NumberFormatException e){ + return false; + } + } + + @Override + public void close() throws IOException { + clientCache.close(); + tupleSource.close(); + } + + @Override + public StreamComparator getStreamSort() { + return tupleSource.getStreamSort(); + } + + @Override + public List children() { + ArrayList sourceList = new ArrayList(1); + sourceList.add(tupleSource); + return sourceList; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter(collection); + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(commitBatchSize))); + expression.addParameter(new StreamExpressionNamedParameter("waitFlush", Boolean.toString(waitFlush))); + expression.addParameter(new StreamExpressionNamedParameter("waitSearcher", Boolean.toString(waitSearcher))); + expression.addParameter(new StreamExpressionNamedParameter("softCommit", Boolean.toString(softCommit))); + + if(includeStreams){ + if(tupleSource instanceof Expressible){ + expression.addParameter(((Expressible)tupleSource).toExpression(factory)); + } else { + throw new IOException("This CommitStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + else{ + expression.addParameter(""); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + // A commit stream is backward wrt the order in the explanation. This stream is the "child" + // while the collection we're committing to is the parent. + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore"); + + explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + explanation.setImplementingClass("Solr/Lucene"); + explanation.setExpressionType(ExpressionType.DATASTORE); + explanation.setExpression("Commit into " + collection); + + // child is a stream so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId().toString()); + child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass()))); + child.setImplementingClass(getClass().getName()); + child.setExpressionType(ExpressionType.STREAM_DECORATOR); + child.setExpression(toExpression(factory, false).toString()); + child.addChild(tupleSource.toExplanation(factory)); + + explanation.addChild(child); + + return explanation; + } + + @Override + public void setStreamContext(StreamContext context) { + if(null != context.getSolrClientCache()){ + this.clientCache = context.getSolrClientCache(); + // this overrides the one created in open + } + + this.tupleSource.setStreamContext(context); + } + + private String findZkHost(StreamFactory factory, String collectionName, StreamExpression expression) { + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + if(null == zkHostExpression){ + String zkHost = factory.getCollectionZkHost(collectionName); + if(zkHost == null) { + return factory.getDefaultZkHost(); + } else { + return zkHost; + } + } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + return ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + + return null; + } + + private void sendCommit() throws IOException { + + try { + clientCache.getCloudSolrClient(zkHost).commit(collection, waitFlush, waitSearcher, softCommit); + } catch (SolrServerException | IOException e) { + LOG.warn(String.format(Locale.ROOT, "Unable to commit documents to collection '%s' due to unexpected error.", collection), e); + String className = e.getClass().getName(); + String message = e.getMessage(); + throw new IOException(String.format(Locale.ROOT,"Unexpected error when committing documents to collection %s- %s:%s", collection, className, message)); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java index 5b1aae76b90..55291bf9ed9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; public class UpdateStream extends TupleStream implements Expressible { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed"; // field name in summary tuple for #docs updated in batch private String collection; private String zkHost; private int updateBatchSize; @@ -307,7 +308,7 @@ public class UpdateStream extends TupleStream implements Expressible { Map m = new HashMap(); this.totalDocsIndex += batchSize; ++batchNumber; - m.put("batchIndexed", batchSize); + m.put(BATCH_INDEXED_FIELD_NAME, batchSize); m.put("totalIndexed", this.totalDocsIndex); m.put("batchNumber", batchNumber); if(coreName != null) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index f127d36f6f0..d2e72dfc9ea 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -174,6 +174,41 @@ public class StreamFactory implements Serializable { return matchingStreamExpressions; } + public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{ + StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); + + if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName)); + } + String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); + try{ + return Integer.parseInt(nStr); + } + catch(NumberFormatException e){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr)); + } + } + + public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{ + StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); + + if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ + if(null != defaultValue){ + return defaultValue; + } + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName)); + } + String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); + return Boolean.parseBoolean(nStr); + } + + public TupleStream constructStream(String expressionClause) throws IOException { return constructStream(StreamExpressionParser.parse(expressionClause)); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 4a3db7788c0..842f6a66338 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -3202,6 +3202,346 @@ public class StreamExpressionTest extends SolrCloudTestCase { CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); } + //////////////////////////////////////////// + @Test + public void testCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("destinationCollection", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("destinationCollection", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class); + + //Copy all docs to destinationCollection + expression = StreamExpressionParser.parse("commit(destinationCollection, batchSize=2, update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\")))"); + stream = factory.constructStream(expression); + List tuples = getTuples(stream); + + //Ensure that all CommitStream tuples indicate the correct number of copied/indexed docs + assert(tuples.size() == 1); + t = tuples.get(0); + assert(t.EOF == false); + assertEquals(5, t.get("batchIndexed")); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(destinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("destinationCollection").process(cluster.getSolrClient()); + } + + @Test + public void testParallelCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class) + .withFunctionName("parallel", ParallelStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "commit(parallelDestinationCollection, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")))"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + + //Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs + long count = 0; + + for(Tuple tuple : tuples) { + count+=tuple.getLong("batchIndexed"); + } + + assert(count == 5); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection").process(cluster.getSolrClient()); + } + + @Test + public void testParallelDaemonCommitStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("commit", CommitStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("daemon", DaemonStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "daemon(commit(parallelDestinationCollection1, batchSize=0, zkHost=\"" + cluster.getZkServer().getZkAddress() + "\", update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))), runInterval=\"1000\", id=\"test\")"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + assert(tuples.size() == 2); + + //Lets sleep long enough for daemon updates to run. + //Lets stop the daemons + ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list")); + + int workersComplete = 0; + for(JettySolrRunner jetty : cluster.getJettySolrRunners()) { + int iterations = 0; + INNER: + while(iterations == 0) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + break INNER; + } else { + long l = tupleResponse.getLong("iterations"); + if(l > 0) { + ++workersComplete; + } else { + try { + Thread.sleep(1000); + } catch(Exception e) { + } + } + iterations = (int) l; + solrStream.close(); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + + //Lets stop the daemons + sParams = new ModifiableSolrParams(); + sParams.set(CommonParams.QT, "/stream"); + sParams.set("action", "stop"); + sParams.set("id", "test"); + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + solrStream.close(); + } + + sParams = new ModifiableSolrParams(); + sParams.set(CommonParams.QT, "/stream"); + sParams.set("action", "list"); + + workersComplete = 0; + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + long stopTime = 0; + INNER: + while(stopTime == 0) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + break INNER; + } else { + stopTime = tupleResponse.getLong("stopTime"); + if (stopTime > 0) { + ++workersComplete; + } else { + try { + Thread.sleep(1000); + } catch(Exception e) { + + } + } + solrStream.close(); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello0")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello1")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello2")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello3")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello4")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); + } + //////////////////////////////////////////// + @Test public void testIntersectStream() throws Exception {