mirror of https://github.com/apache/lucene.git
SOLR-9684: Rename schedule function to priority
This commit is contained in:
parent
93562da610
commit
0999f6779a
|
@ -208,7 +208,7 @@ New Features
|
|||
|
||||
* SOLR-9668,SOLR-7197: introduce cursorMark='true' in SolrEntityProcessor (Yegor Kozlov, Raveendra Yerraguntl via Mikhail Khludnev)
|
||||
|
||||
* SOLR-9684: Add schedule Streaming Expression (Joel Bernstein)
|
||||
* SOLR-9684: Add priority Streaming Expression (Joel Bernstein, David Smiley)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -140,7 +140,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("fetch", FetchStream.class)
|
||||
.withFunctionName("executor", ExecutorStream.class)
|
||||
.withFunctionName("null", NullStream.class)
|
||||
.withFunctionName("schedule", SchedulerStream.class)
|
||||
.withFunctionName("priority", PriorityStream.class)
|
||||
// metrics
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
|
|
|
@ -35,8 +35,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The scheduler wraps two topics that represent high priority and low priority task queues.
|
||||
* Each time the scheduler is called it will check to see if there are any high priority tasks in the queue. If there
|
||||
* The priority function wraps two topics that represent high priority and low priority task queues.
|
||||
* Each time the priority function is called it will check to see if there are any high priority tasks in the queue. If there
|
||||
* are high priority tasks, then the high priority queue will be read until it returns the EOF Tuple.
|
||||
*
|
||||
* If there are no tasks in the high priority queue, then the lower priority task queue will be opened and read until the EOF Tuple is
|
||||
|
@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
|
|||
* The scheduler is designed to be wrapped by the executor function and a daemon function can be used to call the executor iteratively.
|
||||
**/
|
||||
|
||||
public class SchedulerStream extends TupleStream implements Expressible {
|
||||
public class PriorityStream extends TupleStream implements Expressible {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
@ -53,7 +53,7 @@ public class SchedulerStream extends TupleStream implements Expressible {
|
|||
private PushBackStream tasks;
|
||||
private TupleStream currentStream;
|
||||
|
||||
public SchedulerStream(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
public PriorityStream(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
// grab all parameters out
|
||||
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
|
||||
|
|
@ -2826,7 +2826,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulerStream() throws Exception {
|
||||
public void testPriorityStream() throws Exception {
|
||||
Assume.assumeTrue(!useAlias);
|
||||
|
||||
new UpdateRequest()
|
||||
|
@ -2845,7 +2845,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
StreamFactory factory = new StreamFactory()
|
||||
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
|
||||
.withFunctionName("topic", TopicStream.class)
|
||||
.withFunctionName("schedule", SchedulerStream.class);
|
||||
.withFunctionName("priority", PriorityStream.class);
|
||||
|
||||
StreamExpression expression;
|
||||
TupleStream stream;
|
||||
|
@ -2856,7 +2856,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
try {
|
||||
FieldComparator comp = new FieldComparator("a_i", ComparatorOrder.ASCENDING);
|
||||
|
||||
expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))");
|
||||
stream = factory.constructStream(expression);
|
||||
StreamContext context = new StreamContext();
|
||||
|
@ -2870,7 +2870,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
assertEquals(tuples.size(), 4);
|
||||
assertOrder(tuples, 5, 6, 7, 8);
|
||||
|
||||
expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))");
|
||||
stream = factory.constructStream(expression);
|
||||
context = new StreamContext();
|
||||
|
@ -2883,7 +2883,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
assertEquals(tuples.size(), 6);
|
||||
assertOrder(tuples, 0, 1, 2, 3, 4, 9);
|
||||
|
||||
expression = StreamExpressionParser.parse("schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
expression = StreamExpressionParser.parse("priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0))");
|
||||
stream = factory.constructStream(expression);
|
||||
context = new StreamContext();
|
||||
|
@ -2900,7 +2900,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testParallelSchedulerStream() throws Exception {
|
||||
public void testParallelPriorityStream() throws Exception {
|
||||
Assume.assumeTrue(!useAlias);
|
||||
|
||||
new UpdateRequest()
|
||||
|
@ -2920,7 +2920,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
|
||||
.withFunctionName("topic", TopicStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("schedule", SchedulerStream.class);
|
||||
.withFunctionName("priority", PriorityStream.class);
|
||||
|
||||
StreamExpression expression;
|
||||
TupleStream stream;
|
||||
|
@ -2931,7 +2931,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
try {
|
||||
FieldComparator comp = new FieldComparator("a_i", ComparatorOrder.ASCENDING);
|
||||
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))");
|
||||
stream = factory.constructStream(expression);
|
||||
StreamContext context = new StreamContext();
|
||||
|
@ -2945,7 +2945,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
assertEquals(tuples.size(), 4);
|
||||
assertOrder(tuples, 5, 6, 7, 8);
|
||||
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))");
|
||||
stream = factory.constructStream(expression);
|
||||
context = new StreamContext();
|
||||
|
@ -2958,7 +2958,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
assertEquals(tuples.size(), 6);
|
||||
assertOrder(tuples, 0, 1, 2, 3, 4, 9);
|
||||
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", schedule(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
expression = StreamExpressionParser.parse("parallel(collection1, workers=2, sort=\"_version_ asc\", priority(topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_i\", id=1000000, initialCheckpoint=0, partitionKeys=id)," +
|
||||
"topic(collection1, collection1, q=\"a_s:hello1\", fl=\"id,a_i\", id=2000000, initialCheckpoint=0, partitionKeys=id)))");
|
||||
stream = factory.constructStream(expression);
|
||||
context = new StreamContext();
|
||||
|
|
Loading…
Reference in New Issue