SOLR-12684: Document speed gotchas and partitionKeys usage for ParallelStream. Standardize search streams to specify the export handler in the examples for expressions that operate on all the tuples

This commit is contained in:
Varun Thacker 2018-08-24 01:20:06 -07:00
parent cee309a6f7
commit fc9aac11f7
1 changed files with 93 additions and 60 deletions

View File

@ -83,7 +83,7 @@ The following examples show different outputs for this source tuple
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB fieldB
) )
@ -104,7 +104,7 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE sequence(3,4,5) as fieldE
) )
@ -133,9 +133,9 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB, fieldB,
productSort="fieldB DESC" productSort="fieldB desc"
) )
{ {
@ -155,9 +155,9 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE, sequence(3,4,5) as fieldE,
productSort='newFieldE DESC' productSort="newFieldE desc"
) )
{ {
@ -185,9 +185,9 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB as newFieldB, fieldB as newFieldB,
productSort="fieldB DESC" productSort="fieldB desc"
) )
{ {
@ -209,7 +209,7 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB, fieldB,
fieldC fieldC
) )
@ -251,10 +251,10 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB, fieldB,
fieldC, fieldC,
productSort="fieldC ASC" productSort="fieldC asc"
) )
{ {
@ -294,10 +294,10 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB, fieldB,
fieldC, fieldC,
productSort="fieldC ASC, fieldB DESC" productSort="fieldC asc, fieldB desc"
) )
{ {
@ -337,7 +337,7 @@ cartesianProduct(
[source,text] [source,text]
---- ----
cartesianProduct( cartesianProduct(
search(collection1, q='*:*', fl='fieldA, fieldB, fieldC', sort='fieldA ASC'), search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE, sequence(3,4,5) as fieldE,
fieldB fieldB
) )
@ -407,6 +407,7 @@ classify(model(modelCollection,
cacheMillis=5000), cacheMillis=5000),
search(contentCollection, search(contentCollection,
q="id:(a b c)", q="id:(a b c)",
qt="/export",
fl="text_t, id", fl="text_t, id",
sort="id asc"), sort="id asc"),
field="text_t") field="text_t")
@ -437,7 +438,7 @@ commit(
update( update(
destinationCollection, destinationCollection,
batchSize=5, batchSize=5,
search(collection1, q=*:*, fl="id,a_s,a_i,a_f,s_multi,i_multi", sort="a_f asc, a_i asc") search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f,s_multi,i_multi", sort="a_f asc, a_i asc")
) )
) )
---- ----
@ -457,14 +458,14 @@ The `complement` function wraps two streams (A and B) and emits tuples from A wh
[source,text] [source,text]
---- ----
complement( complement(
search(collection1, q=a_s:(setA || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q=a_s:(setB || setAB), fl="id,a_s,a_i", sort="a_i asc"), search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i" on="a_i"
) )
complement( complement(
search(collection1, q=a_s:(setA || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q=a_s:(setB || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s" on="a_i,a_s"
) )
---- ----
@ -660,7 +661,7 @@ The `fetch` function iterates a stream and fetches additional fields and adds th
[source,text] [source,text]
---- ----
fetch(addresses, fetch(addresses,
search(people, q="*:*", fl="username, firstName, lastName", sort="username asc"), search(people, q="*:*", qt="/export", fl="username, firstName, lastName", sort="username asc"),
fl="streetAddress, city, state, country, zip", fl="streetAddress, city, state, country, zip",
on="username=userId") on="username=userId")
---- ----
@ -685,7 +686,8 @@ The comparison evaluators compare the value in a specific field with a value, wh
having(rollup(over=a_s, having(rollup(over=a_s,
sum(a_i), sum(a_i),
search(collection1, search(collection1,
q=*:*, q="*:*",
qt="/export",
fl="id,a_s,a_i,a_f", fl="id,a_s,a_i,a_f",
sort="a_s asc")), sort="a_s asc")),
and(gt(sum(a_i), 100), lt(sum(a_i), 110))) and(gt(sum(a_i), 100), lt(sum(a_i), 110)))
@ -711,21 +713,21 @@ You can wrap the incoming streams with a `select` function to be specific about
[source,text] [source,text]
---- ----
leftOuterJoin( leftOuterJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q=type:cat, fl="personId,petName", sort="personId asc"), search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId" on="personId"
) )
leftOuterJoin( leftOuterJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q=type:cat, fl="ownerId,petName", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId" on="personId=ownerId"
) )
leftOuterJoin( leftOuterJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select( select(
search(pets, q=type:cat, fl="ownerId,name", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId, ownerId,
name as petName name as petName
), ),
@ -752,21 +754,21 @@ The hashJoin function can be used when the tuples of Left and Right cannot be pu
[source,text] [source,text]
---- ----
hashJoin( hashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q=type:cat, fl="personId,petName", sort="personId asc"), hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId" on="personId"
) )
hashJoin( hashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", fl="personId,name", sort="personId asc"),
hashed=search(pets, q=type:cat, fl="ownerId,petName", sort="ownerId asc"), hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId" on="personId=ownerId"
) )
hashJoin( hashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select( hashed=select(
search(pets, q=type:cat, fl="ownerId,name", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId, ownerId,
name as petName name as petName
), ),
@ -789,21 +791,21 @@ Wraps two streams, Left and Right. For every tuple in Left which exists in Right
[source,text] [source,text]
---- ----
innerJoin( innerJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q=type:cat, fl="personId,petName", sort="personId asc"), search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId" on="personId"
) )
innerJoin( innerJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q=type:cat, fl="ownerId,petName", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId" on="personId=ownerId"
) )
innerJoin( innerJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select( select(
search(pets, q=type:cat, fl="ownerId,name", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId, ownerId,
name as petName name as petName
), ),
@ -826,14 +828,14 @@ The `intersect` function wraps two streams, A and B, and emits tuples from A whi
[source,text] [source,text]
---- ----
intersect( intersect(
search(collection1, q=a_s:(setA || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q=a_s:(setB || setAB), fl="id,a_s,a_i", sort="a_i asc"), search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i" on="a_i"
) )
intersect( intersect(
search(collection1, q=a_s:(setA || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q=a_s:(setB || setAB), fl="id,a_s,a_i", sort="a_i asc, a_s asc"), search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s" on="a_i,a_s"
) )
---- ----
@ -857,10 +859,12 @@ The `merge` function merges two or more streaming expressions and maintains the
merge( merge(
search(collection1, search(collection1,
q="id:(0 3 4)", q="id:(0 3 4)",
qt="/export",
fl="id,a_s,a_i,a_f", fl="id,a_s,a_i,a_f",
sort="a_f asc"), sort="a_f asc"),
search(collection1, search(collection1,
q="id:(1)", q="id:(1)",
qt="/export",
fl="id,a_s,a_i,a_f", fl="id,a_s,a_i,a_f",
sort="a_f asc"), sort="a_f asc"),
on="a_f asc") on="a_f asc")
@ -873,18 +877,22 @@ merge(
merge( merge(
search(collection1, search(collection1,
q="id:(0 3 4)", q="id:(0 3 4)",
qt="/export",
fl="id,fieldA,fieldB,fieldC", fl="id,fieldA,fieldB,fieldC",
sort="fieldA asc, fieldB desc"), sort="fieldA asc, fieldB desc"),
search(collection1, search(collection1,
q="id:(1)", q="id:(1)",
qt="/export",
fl="id,fieldA", fl="id,fieldA",
sort="fieldA asc"), sort="fieldA asc"),
search(collection2, search(collection2,
q="id:(10 11 13)", q="id:(10 11 13)",
qt="/export",
fl="id,fieldA,fieldC", fl="id,fieldA,fieldC",
sort="fieldA asc"), sort="fieldA asc"),
search(collection3, search(collection3,
q="id:(987)", q="id:(987)",
qt="/export",
fl="id,fieldA,fieldC", fl="id,fieldA,fieldC",
sort="fieldA asc"), sort="fieldA asc"),
on="fieldA asc") on="fieldA asc")
@ -909,7 +917,7 @@ The null expression can be wrapped by the parallel function and sent to worker n
[source,text] [source,text]
---- ----
parallel(workerCollection, parallel(workerCollection,
null(search(collection1, q=*:*, fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")), null(search(collection1, q="*:*", fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")),
workers="20", workers="20",
zkHost="localhost:9983", zkHost="localhost:9983",
sort="a_s desc") sort="a_s desc")
@ -936,21 +944,21 @@ The outerHashJoin stream can be used when the tuples of Left and Right cannot be
[source,text] [source,text]
---- ----
outerHashJoin( outerHashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q=type:cat, fl="personId,petName", sort="personId asc"), hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId" on="personId"
) )
outerHashJoin( outerHashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q=type:cat, fl="ownerId,petName", sort="ownerId asc"), hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId" on="personId=ownerId"
) )
outerHashJoin( outerHashJoin(
search(people, q=*:*, fl="personId,name", sort="personId asc"), search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select( hashed=select(
search(pets, q=type:cat, fl="ownerId,name", sort="ownerId asc"), search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId, ownerId,
name as petName name as petName
), ),
@ -964,7 +972,11 @@ The `parallel` function wraps a streaming expression and sends it to N worker no
The parallel function requires that the `partitionKeys` parameter be provided to the underlying searches. The `partitionKeys` parameter will partition the search results (tuples) across the worker nodes. Tuples with the same values in the partitionKeys field will be shuffled to the same worker nodes. The parallel function requires that the `partitionKeys` parameter be provided to the underlying searches. The `partitionKeys` parameter will partition the search results (tuples) across the worker nodes. Tuples with the same values in the partitionKeys field will be shuffled to the same worker nodes.
The parallel function maintains the sort order of the tuples returned by the worker nodes, so the sort criteria of the parallel function must match up with the sort order of the tuples returned by the workers. The parallel function maintains the sort order of the tuples returned by the worker nodes, so the sort criteria of the parallel function must incorporate the sort order of the tuples returned by the workers.
For example if you sort on year, month and day you could partition on year only as long as there was enough different years to spread the tuples around the worker nodes.
Solr allows sorting on more than 4 fields, but you cannot specify more than 4 partitionKeys for speed tradeoffs. Also it's an overkill to specify many partitionKeys when we one or two keys could be enough to spread the tuples.
Parallel Stream was designed when the underlying search stream will emit a lot of tuples from the collection. If the search stream only emits a small subset of the data from the collection using parallel could potentially be slower.
.Worker Collections .Worker Collections
[TIP] [TIP]
@ -985,16 +997,36 @@ The worker nodes can be from the same collection as the data, or they can be a d
[source,text] [source,text]
---- ----
parallel(workerCollection, parallel(workerCollection,
reduce(search(collection1, q=*:*, fl="id,a_s,a_i,a_f", sort="a_s desc", partitionKeys="a_s"), rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
by="a_s", over="year_i", count(*)),
group(sort="a_f desc", n="4")),
workers="20", workers="20",
zkHost="localhost:9983", zkHost="localhost:9983",
sort="a_s desc") sort="year_i desc")
---- ----
The expression above shows a `parallel` function wrapping a `reduce` function. This will cause the `reduce` function to be run in parallel across 20 worker nodes. The expression above shows a `parallel` function wrapping a `reduce` function. This will cause the `reduce` function to be run in parallel across 20 worker nodes.
.Warmup
[TIP]
====
The parallel stream uses the hash query parser to split the data amongst the workers. It executes on all the documents and the result bitset is cached in the filterCache.
For a parallel stream with the same number of workers and partitonKeys the first query would be slower than subsequent queries.
A trick to not pay the penalty for the first slow query would be to use a warmup query for every new searcher.
The following is a solrconfig.xml snippet for 2 workers and "year_i" as the partionKeys.
[source,text]
----
<listener event="newSearcher" class="solr.QuerySenderListener">
<arr name="queries">
<lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=0}</str><str name="partitionKeys">year_i</str></lst>
<lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=1}</str><str name="partitionKeys">year_i</str></lst>
</arr>
</listener>
----
====
== priority == priority
The `priority` function is a simple priority scheduler for the <<executor>> function. The `executor` function doesn't directly have a concept of task prioritization; instead it simply executes tasks in the order that they are read from it's underlying stream. The `priority` function provides the ability to schedule a higher priority task ahead of lower priority tasks that were submitted earlier. The `priority` function is a simple priority scheduler for the <<executor>> function. The `executor` function doesn't directly have a concept of task prioritization; instead it simply executes tasks in the order that they are read from it's underlying stream. The `priority` function provides the ability to schedule a higher priority task ahead of lower priority tasks that were submitted earlier.
@ -1043,7 +1075,7 @@ The reduce function relies on the sort order of the underlying stream. According
[source,text] [source,text]
---- ----
reduce(search(collection1, q=*:*, fl="id,a_s,a_i,a_f", sort="a_s asc, a_f asc"), reduce(search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_s asc, a_f asc"),
by="a_s", by="a_s",
group(sort="a_f desc", n="4") group(sort="a_f desc", n="4")
) )
@ -1066,7 +1098,7 @@ The rollup function also needs to process entire result sets in order to perform
[source,text] [source,text]
---- ----
rollup( rollup(
search(collection1, q=*:*, fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"), search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
over="a_s", over="a_s",
sum(a_i), sum(a_i),
sum(a_f), sum(a_f),
@ -1104,7 +1136,7 @@ The `select` function wraps a streaming expression and outputs tuples containing
---- ----
// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0 // output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select( select(
search(collection1, fl="id,teamName_s,wins,losses", q="*:*", sort="id asc"), search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
teamName_s as teamName, teamName_s as teamName,
wins, wins,
losses, losses,
@ -1131,8 +1163,8 @@ The expression below finds dog owners and orders the results by owner and pet na
---- ----
sort( sort(
innerJoin( innerJoin(
search(people, q=*:*, fl="id,name", sort="id asc"), search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
search(pets, q=type:dog, fl="owner,petName", sort="owner asc"), search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
on="id=owner" on="id=owner"
), ),
by="name asc, petName asc" by="name asc, petName asc"
@ -1206,6 +1238,7 @@ The `update` function wraps another functions and sends the tuples to a SolrClou
batchSize=500, batchSize=500,
search(collection1, search(collection1,
q=*:*, q=*:*,
qt="/export",
fl="id,a_s,a_i,a_f,s_multi,i_multi", fl="id,a_s,a_i,a_f,s_multi,i_multi",
sort="a_f asc, a_i asc")) sort="a_f asc, a_i asc"))