SOLR-9417: Allow daemons to terminate when they finish iterating a topic

This commit is contained in:
Joel Bernstein 2016-10-19 13:16:01 -04:00
parent 2b647cc945
commit 6828dbc9ff
3 changed files with 213 additions and 28 deletions

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -97,7 +98,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private StreamFactory streamFactory = new StreamFactory(); private StreamFactory streamFactory = new StreamFactory();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
private Map<String, DaemonStream> daemons = new HashMap<>(); private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override @Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@ -246,6 +247,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
if(daemons.containsKey(daemonStream.getId())) { if(daemons.containsKey(daemonStream.getId())) {
daemons.remove(daemonStream.getId()).close(); daemons.remove(daemonStream.getId()).close();
} }
daemonStream.setDaemons(daemons);
daemonStream.open(); //This will start the deamonStream daemonStream.open(); //This will start the deamonStream
daemons.put(daemonStream.getId(), daemonStream); daemons.put(daemonStream.getId(), daemonStream);
rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName)); rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));

View File

@ -52,6 +52,8 @@ public class DaemonStream extends TupleStream implements Expressible {
private Exception exception; private Exception exception;
private long runInterval; private long runInterval;
private String id; private String id;
private Map<String, DaemonStream> daemons;
private boolean terminate;
private boolean closed = false; private boolean closed = false;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -64,10 +66,13 @@ public class DaemonStream extends TupleStream implements Expressible {
StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id"); StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id");
StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval"); StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize"); StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate");
String id = null; String id = null;
long runInterval = 0L; long runInterval = 0L;
int queueSize = 0; int queueSize = 0;
boolean terminate = false;
if(idExpression == null) { if(idExpression == null) {
throw new IOException("Invalid expression id parameter expected"); throw new IOException("Invalid expression id parameter expected");
@ -85,21 +90,23 @@ public class DaemonStream extends TupleStream implements Expressible {
queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue()); queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue());
} }
// validate expression contains only what we want. if(terminateExpression != null) {
if(expression.getParameters().size() != streamExpressions.size() + 2 && terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue());
expression.getParameters().size() != streamExpressions.size() + 3) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
} }
if(1 != streamExpressions.size()){ if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
} }
init(tupleStream, id, runInterval, queueSize); init(tupleStream, id, runInterval, queueSize, terminate);
}
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
init(tupleStream, id, runInterval, queueSize, terminate);
} }
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) { public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
init(tupleStream, id, runInterval, queueSize); this(tupleStream, id, runInterval, queueSize, false);
} }
@Override @Override
@ -126,6 +133,7 @@ public class DaemonStream extends TupleStream implements Expressible {
expression.addParameter(new StreamExpressionNamedParameter("id", id)); expression.addParameter(new StreamExpressionNamedParameter("id", id));
expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval))); expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize))); expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate)));
return expression; return expression;
} }
@ -148,10 +156,16 @@ public class DaemonStream extends TupleStream implements Expressible {
} }
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) { public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
init(tupleStream, id, runInterval, queueSize, false);
}
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
this.tupleStream = tupleStream; this.tupleStream = tupleStream;
this.id = id; this.id = id;
this.runInterval = runInterval; this.runInterval = runInterval;
this.queueSize = queueSize; this.queueSize = queueSize;
this.terminate = terminate;
if(queueSize > 0) { if(queueSize > 0) {
queue = new ArrayBlockingQueue(queueSize); queue = new ArrayBlockingQueue(queueSize);
eatTuples = false; eatTuples = false;
@ -228,6 +242,10 @@ public class DaemonStream extends TupleStream implements Expressible {
return tuple; return tuple;
} }
public void setDaemons(Map<String, DaemonStream> daemons) {
this.daemons = daemons;
}
private synchronized void incrementIterations() { private synchronized void incrementIterations() {
++iterations; ++iterations;
} }
@ -279,6 +297,18 @@ public class DaemonStream extends TupleStream implements Expressible {
errors = 0; // Reset errors on successful run. errors = 0; // Reset errors on successful run.
if (tuple.fields.containsKey("sleepMillis")) { if (tuple.fields.containsKey("sleepMillis")) {
this.sleepMillis = tuple.getLong("sleepMillis"); this.sleepMillis = tuple.getLong("sleepMillis");
if(terminate && sleepMillis > 0) {
//TopicStream provides sleepMillis > 0 if the last run had no Tuples.
//This means the topic queue is empty. Time to terminate.
//Remove ourselves from the daemons map.
if(daemons != null) {
daemons.remove(id);
}
//Break out of the thread loop and end the run.
break OUTER;
}
this.runInterval = -1; this.runInterval = -1;
} }
break INNER; break INNER;

View File

@ -1003,6 +1003,45 @@ public class StreamExpressionTest extends SolrCloudTestCase {
} }
@Test
public void testTerminatingDaemonStream() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withFunctionName("topic", TopicStream.class)
.withFunctionName("daemon", DaemonStream.class);
StreamExpression expression;
DaemonStream daemonStream;
SolrClientCache cache = new SolrClientCache();
StreamContext context = new StreamContext();
context.setSolrClientCache(cache);
expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
+ "), id=test, runInterval=1000, terminate=true, queueSize=50)");
daemonStream = (DaemonStream)factory.constructStream(expression);
daemonStream.setStreamContext(context);
List<Tuple> tuples = getTuples(daemonStream);
assertTrue(tuples.size() == 10);
cache.close();
}
@Test @Test
public void testRollupStream() throws Exception { public void testRollupStream() throws Exception {
@ -3202,6 +3241,120 @@ public class StreamExpressionTest extends SolrCloudTestCase {
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
} }
@Test
public void testParallelTerminatingDaemonUpdateStream() throws Exception {
CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7")
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection1");
StreamExpression expression;
TupleStream stream;
Tuple t;
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress())
.withFunctionName("topic", TopicStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("daemon", DaemonStream.class);
//Copy all docs to destinationCollection
String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", partitionKeys=\"a_f\", initialCheckpoint=0, id=\"topic1\")), terminate=true, runInterval=\"1000\", id=\"test\")";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
List<Tuple> tuples = getTuples(parallelUpdateStream);
assert(tuples.size() == 2);
ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
int workersComplete = 0;
//Daemons should terminate after the topic is completed
//Loop through all shards and wait for the daemons to be gone from the listing.
for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
INNER:
while(true) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
solrStream.close();
++workersComplete;
break INNER;
} else {
solrStream.close();
Thread.sleep(1000);
}
}
}
assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
cluster.getSolrClient().commit("parallelDestinationCollection1");
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
}
//////////////////////////////////////////// ////////////////////////////////////////////
@Test @Test
public void testCommitStream() throws Exception { public void testCommitStream() throws Exception {