diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index b9f30bc271b..3e841bdc4ee 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -19,6 +19,7 @@ package org.apache.solr.handler; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -97,7 +98,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, private StreamFactory streamFactory = new StreamFactory(); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String coreName; - private Map daemons = new HashMap<>(); + private Map daemons = Collections.synchronizedMap(new HashMap()); @Override public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { @@ -245,6 +246,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, if(daemons.containsKey(daemonStream.getId())) { daemons.remove(daemonStream.getId()).close(); } + daemonStream.setDaemons(daemons); daemonStream.open(); //This will start the deamonStream daemons.put(daemonStream.getId(), daemonStream); rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName)); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java index 77648dfd738..8214f9a3805 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java @@ -52,6 +52,8 @@ public class DaemonStream extends TupleStream implements Expressible { private Exception exception; private long runInterval; private String id; + private Map daemons; + private boolean terminate; private boolean closed = false; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -64,10 +66,13 @@ public class DaemonStream extends TupleStream implements Expressible { StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id"); StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval"); StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize"); + StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate"); + String id = null; long runInterval = 0L; int queueSize = 0; + boolean terminate = false; if(idExpression == null) { throw new IOException("Invalid expression id parameter expected"); @@ -82,24 +87,26 @@ public class DaemonStream extends TupleStream implements Expressible { } if(queueExpression != null) { - queueSize= Integer.parseInt(((StreamExpressionValue)queueExpression.getParameter()).getValue()); + queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue()); } - // validate expression contains only what we want. - if(expression.getParameters().size() != streamExpressions.size() + 2 && - expression.getParameters().size() != streamExpressions.size() + 3) { - throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + if(terminateExpression != null) { + terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue()); } if(1 != streamExpressions.size()){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); } - init(tupleStream, id, runInterval, queueSize); + init(tupleStream, id, runInterval, queueSize, terminate); + } + + public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) { + init(tupleStream, id, runInterval, queueSize, terminate); } public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) { - init(tupleStream, id, runInterval, queueSize); + this(tupleStream, id, runInterval, queueSize, false); } @Override @@ -126,6 +133,7 @@ public class DaemonStream extends TupleStream implements Expressible { expression.addParameter(new StreamExpressionNamedParameter("id", id)); expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval))); expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize))); + expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate))); return expression; } @@ -148,10 +156,16 @@ public class DaemonStream extends TupleStream implements Expressible { } public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) { + init(tupleStream, id, runInterval, queueSize, false); + } + + public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) { this.tupleStream = tupleStream; this.id = id; this.runInterval = runInterval; this.queueSize = queueSize; + this.terminate = terminate; + if(queueSize > 0) { queue = new ArrayBlockingQueue(queueSize); eatTuples = false; @@ -228,6 +242,10 @@ public class DaemonStream extends TupleStream implements Expressible { return tuple; } + public void setDaemons(Map daemons) { + this.daemons = daemons; + } + private synchronized void incrementIterations() { ++iterations; } @@ -279,6 +297,18 @@ public class DaemonStream extends TupleStream implements Expressible { errors = 0; // Reset errors on successful run. if (tuple.fields.containsKey("sleepMillis")) { this.sleepMillis = tuple.getLong("sleepMillis"); + + if(terminate && sleepMillis > 0) { + //TopicStream provides sleepMillis > 0 if the last run had no Tuples. + //This means the topic queue is empty. Time to terminate. + //Remove ourselves from the daemons map. + if(daemons != null) { + daemons.remove(id); + } + //Break out of the thread loop and end the run. + break OUTER; + } + this.runInterval = -1; } break INNER; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 842f6a66338..7b5777d2314 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -533,24 +533,24 @@ public class StreamExpressionTest extends SolrCloudTestCase { // Basic test desc expression = StreamExpressionParser.parse("top(" - + "n=2," - + "unique(" - + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\")," - + "over=\"a_f\")," - + "sort=\"a_f desc\")"); + + "n=2," + + "unique(" + + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\")," + + "over=\"a_f\")," + + "sort=\"a_f desc\")"); stream = new RankStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 2); - assertOrder(tuples, 4,3); + assertOrder(tuples, 4, 3); // full factory stream = factory.constructStream("top(" - + "n=4," - + "unique(" - + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")," - + "over=\"a_f\")," - + "sort=\"a_f asc\")"); + + "n=4," + + "unique(" + + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")," + + "over=\"a_f\")," + + "sort=\"a_f asc\")"); tuples = getTuples(stream); assert(tuples.size() == 4); @@ -827,7 +827,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { .withFunctionName("parallel", ParallelStream.class) .withFunctionName("fetch", FetchStream.class); - stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))"); + stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))"); tuples = getTuples(stream); assert(tuples.size() == 10); @@ -853,7 +853,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { assertTrue("blah blah blah 9".equals(t.getString("subject"))); - stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))"); + stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))"); tuples = getTuples(stream); assert(tuples.size() == 10); @@ -1003,6 +1003,45 @@ public class StreamExpressionTest extends SolrCloudTestCase { } + + @Test + public void testTerminatingDaemonStream() throws Exception { + + new UpdateRequest() + .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1") + .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2") + .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3") + .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4") + .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5") + .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") + .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") + .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") + .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") + .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10") + .commit(cluster.getSolrClient(), COLLECTION); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("daemon", DaemonStream.class); + + StreamExpression expression; + DaemonStream daemonStream; + + SolrClientCache cache = new SolrClientCache(); + StreamContext context = new StreamContext(); + context.setSolrClientCache(cache); + expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\"" + + "), id=test, runInterval=1000, terminate=true, queueSize=50)"); + daemonStream = (DaemonStream)factory.constructStream(expression); + daemonStream.setStreamContext(context); + + List tuples = getTuples(daemonStream); + assertTrue(tuples.size() == 10); + cache.close(); + } + + @Test public void testRollupStream() throws Exception { @@ -1367,7 +1406,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { assert(tuples.size() == 9); - assertOrder(tuples, 0,1,2,3,4,7,6,8,9); + assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9); //Test descending @@ -1376,7 +1415,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { tuples = getTuples(pstream); assert(tuples.size() == 8); - assertOrder(tuples, 9,8,6,4,3,2,1,0); + assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0); } @@ -1627,7 +1666,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2); + assertOrder(tuples, 7, 6, 3, 4, 5, 1, 1, 15, 15, 2); // Results in both searches, no join matches expression = StreamExpressionParser.parse("leftOuterJoin(" @@ -1637,7 +1676,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 8); - assertOrder(tuples, 1,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 15, 2, 3, 4, 5, 6, 7); // Differing field names expression = StreamExpressionParser.parse("leftOuterJoin(" @@ -1647,7 +1686,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); } @@ -1764,7 +1803,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); // Basic desc expression = StreamExpressionParser.parse("outerHashJoin(" @@ -1794,7 +1833,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); } @Test @@ -3202,6 +3241,120 @@ public class StreamExpressionTest extends SolrCloudTestCase { CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); } + + @Test + public void testParallelTerminatingDaemonUpdateStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("daemon", DaemonStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", partitionKeys=\"a_f\", initialCheckpoint=0, id=\"topic1\")), terminate=true, runInterval=\"1000\", id=\"test\")"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + assert(tuples.size() == 2); + + + ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list")); + + int workersComplete = 0; + + //Daemons should terminate after the topic is completed + //Loop through all shards and wait for the daemons to be gone from the listing. + for(JettySolrRunner jetty : cluster.getJettySolrRunners()) { + INNER: + while(true) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + ++workersComplete; + break INNER; + } else { + solrStream.close(); + Thread.sleep(1000); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + + cluster.getSolrClient().commit("parallelDestinationCollection1"); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); + } + + + //////////////////////////////////////////// @Test public void testCommitStream() throws Exception {