mirror of https://github.com/apache/lucene.git
SOLR-10274: The search Streaming Expression should work in non-SolrCloud mode
This commit is contained in:
parent
57c5837183
commit
06a55b73b9
|
@ -26,8 +26,6 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -35,7 +33,6 @@ import java.util.concurrent.Future;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
|
||||||
import org.apache.solr.client.solrj.io.Tuple;
|
import org.apache.solr.client.solrj.io.Tuple;
|
||||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||||
|
@ -52,9 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||||
import org.apache.solr.common.cloud.Aliases;
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.MapSolrParams;
|
import org.apache.solr.common.params.MapSolrParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
@ -178,9 +173,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
||||||
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
|
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
|
||||||
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
|
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
if(null == zkHost){
|
if(null == zkHost){
|
||||||
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// We've got all the required items
|
// We've got all the required items
|
||||||
init(collectionName, zkHost, mParams);
|
init(collectionName, zkHost, mParams);
|
||||||
|
@ -299,14 +296,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
||||||
this.tuples = new TreeSet();
|
this.tuples = new TreeSet();
|
||||||
this.solrStreams = new ArrayList();
|
this.solrStreams = new ArrayList();
|
||||||
this.eofTuples = Collections.synchronizedMap(new HashMap());
|
this.eofTuples = Collections.synchronizedMap(new HashMap());
|
||||||
if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
|
|
||||||
this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
|
|
||||||
} else {
|
|
||||||
this.cloudSolrClient = new Builder()
|
|
||||||
.withZkHost(zkHost)
|
|
||||||
.build();
|
|
||||||
this.cloudSolrClient.connect();
|
|
||||||
}
|
|
||||||
constructStreams();
|
constructStreams();
|
||||||
openStreams();
|
openStreams();
|
||||||
}
|
}
|
||||||
|
@ -400,29 +389,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
||||||
|
|
||||||
protected void constructStreams() throws IOException {
|
protected void constructStreams() throws IOException {
|
||||||
try {
|
try {
|
||||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
|
||||||
ClusterState clusterState = zkStateReader.getClusterState();
|
|
||||||
|
|
||||||
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
|
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
|
||||||
|
|
||||||
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||||
mParams = adjustParams(mParams);
|
mParams = adjustParams(mParams);
|
||||||
mParams.set(DISTRIB, "false"); // We are the aggregator.
|
mParams.set(DISTRIB, "false"); // We are the aggregator.
|
||||||
|
|
||||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
for(String shardUrl : shardUrls) {
|
||||||
for(Slice slice : slices) {
|
SolrStream solrStream = new SolrStream(shardUrl, mParams);
|
||||||
Collection<Replica> replicas = slice.getReplicas();
|
|
||||||
List<Replica> shuffler = new ArrayList<>();
|
|
||||||
for(Replica replica : replicas) {
|
|
||||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
|
|
||||||
shuffler.add(replica);
|
|
||||||
}
|
|
||||||
|
|
||||||
Collections.shuffle(shuffler, new Random());
|
|
||||||
Replica rep = shuffler.get(0);
|
|
||||||
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
|
|
||||||
String url = zkProps.getCoreUrl();
|
|
||||||
SolrStream solrStream = new SolrStream(url, mParams);
|
|
||||||
if(streamContext != null) {
|
if(streamContext != null) {
|
||||||
solrStream.setStreamContext(streamContext);
|
solrStream.setStreamContext(streamContext);
|
||||||
}
|
}
|
||||||
|
@ -468,12 +443,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
||||||
solrStream.close();
|
solrStream.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
|
|
||||||
cloudSolrClient != null) {
|
|
||||||
|
|
||||||
cloudSolrClient.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return the stream sort - ie, the order in which records are returned */
|
/** Return the stream sort - ie, the order in which records are returned */
|
||||||
|
|
|
@ -263,27 +263,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
||||||
try {
|
try {
|
||||||
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
|
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
|
||||||
|
|
||||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
|
||||||
|
|
||||||
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
|
|
||||||
|
|
||||||
ClusterState clusterState = zkStateReader.getClusterState();
|
|
||||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
|
||||||
|
|
||||||
List<Replica> shuffler = new ArrayList<>();
|
|
||||||
for(Slice slice : slices) {
|
|
||||||
Collection<Replica> replicas = slice.getReplicas();
|
|
||||||
for (Replica replica : replicas) {
|
|
||||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
|
|
||||||
shuffler.add(replica);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(workers > shuffler.size()) {
|
|
||||||
throw new IOException("Number of workers exceeds nodes in the worker collection");
|
|
||||||
}
|
|
||||||
|
|
||||||
Collections.shuffle(shuffler, new Random());
|
|
||||||
|
|
||||||
for(int w=0; w<workers; w++) {
|
for(int w=0; w<workers; w++) {
|
||||||
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
|
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
|
||||||
|
@ -293,9 +273,8 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
||||||
|
|
||||||
paramsLoc.set("expr", pushStream.toString());
|
paramsLoc.set("expr", pushStream.toString());
|
||||||
paramsLoc.set("qt","/stream");
|
paramsLoc.set("qt","/stream");
|
||||||
Replica rep = shuffler.get(w);
|
|
||||||
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
|
String url = shardUrls.get(w);
|
||||||
String url = zkProps.getCoreUrl();
|
|
||||||
SolrStream solrStream = new SolrStream(url, paramsLoc);
|
SolrStream solrStream = new SolrStream(url, paramsLoc);
|
||||||
solrStreams.add(solrStream);
|
solrStreams.add(solrStream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,10 @@ public class StreamContext implements Serializable{
|
||||||
this.entries.put(key, value);
|
this.entries.put(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean containsKey(Object key) {
|
||||||
|
return entries.containsKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
public Map getEntries() {
|
public Map getEntries() {
|
||||||
return this.entries;
|
return this.entries;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||||
import org.apache.solr.client.solrj.io.Tuple;
|
import org.apache.solr.client.solrj.io.Tuple;
|
||||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||||
|
@ -206,6 +207,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
|
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StreamContext streamContext = new StreamContext();
|
||||||
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
|
|
||||||
// Load Solr
|
// Load Solr
|
||||||
new UpdateRequest()
|
new UpdateRequest()
|
||||||
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
|
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
|
||||||
|
@ -218,17 +223,24 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
|
||||||
// Simple 1
|
try {
|
||||||
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
|
// Simple 1
|
||||||
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
|
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
|
||||||
TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
|
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>() {{
|
||||||
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
|
put("CODE", "code_s");
|
||||||
|
put("COUNTRY_NAME", "name_s");
|
||||||
|
}});
|
||||||
|
TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
|
||||||
|
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream, searchStream});
|
||||||
|
mergeStream.setStreamContext(streamContext);
|
||||||
|
tuples = getTuples(mergeStream);
|
||||||
|
|
||||||
tuples = getTuples(mergeStream);
|
assertEquals(7, tuples.size());
|
||||||
|
assertOrderOf(tuples, "code_s", "AL", "CA", "GB", "NL", "NO", "NP", "US");
|
||||||
assertEquals(7, tuples.size());
|
assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
|
||||||
assertOrderOf(tuples, "code_s", "AL","CA","GB","NL","NO","NP","US");
|
} finally {
|
||||||
assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
|
solrClientCache.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -277,32 +289,41 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
String expression;
|
String expression;
|
||||||
TupleStream stream;
|
TupleStream stream;
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
StreamContext streamContext = new StreamContext();
|
||||||
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
|
|
||||||
// Basic test
|
try {
|
||||||
expression =
|
// Basic test
|
||||||
"innerJoin("
|
expression =
|
||||||
+ " select("
|
"innerJoin("
|
||||||
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
+ " select("
|
||||||
+ " personId_i as personId,"
|
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
||||||
+ " rating_f as rating"
|
+ " personId_i as personId,"
|
||||||
+ " ),"
|
+ " rating_f as rating"
|
||||||
+ " select("
|
+ " ),"
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
|
+ " select("
|
||||||
+ " ID as personId,"
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
|
||||||
+ " NAME as personName,"
|
+ " ID as personId,"
|
||||||
+ " COUNTRY_NAME as country"
|
+ " NAME as personName,"
|
||||||
+ " ),"
|
+ " COUNTRY_NAME as country"
|
||||||
+ " on=\"personId\""
|
+ " ),"
|
||||||
+ ")";
|
+ " on=\"personId\""
|
||||||
|
+ ")";
|
||||||
|
|
||||||
stream = factory.constructStream(expression);
|
|
||||||
tuples = getTuples(stream);
|
|
||||||
|
|
||||||
assertEquals(10, tuples.size());
|
stream = factory.constructStream(expression);
|
||||||
assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
|
stream.setStreamContext(streamContext);
|
||||||
assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
|
tuples = getTuples(stream);
|
||||||
assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
|
|
||||||
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
|
assertEquals(10, tuples.size());
|
||||||
|
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
|
||||||
|
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
|
||||||
|
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
|
||||||
|
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
|
||||||
|
} finally {
|
||||||
|
solrClientCache.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -351,58 +372,67 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
String expression;
|
String expression;
|
||||||
TupleStream stream;
|
TupleStream stream;
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
StreamContext streamContext = new StreamContext();
|
||||||
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
|
|
||||||
// Basic test for no alias
|
try {
|
||||||
expression =
|
// Basic test for no alias
|
||||||
"innerJoin("
|
expression =
|
||||||
+ " select("
|
"innerJoin("
|
||||||
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
+ " select("
|
||||||
+ " personId_i as personId,"
|
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
||||||
+ " rating_f as rating"
|
+ " personId_i as personId,"
|
||||||
+ " ),"
|
+ " rating_f as rating"
|
||||||
+ " select("
|
+ " ),"
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
|
+ " select("
|
||||||
+ " ID as personId,"
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
|
||||||
+ " NAME as personName,"
|
+ " ID as personId,"
|
||||||
+ " COUNTRY_NAME as country"
|
+ " NAME as personName,"
|
||||||
+ " ),"
|
+ " COUNTRY_NAME as country"
|
||||||
+ " on=\"personId\""
|
+ " ),"
|
||||||
+ ")";
|
+ " on=\"personId\""
|
||||||
|
+ ")";
|
||||||
|
|
||||||
stream = factory.constructStream(expression);
|
stream = factory.constructStream(expression);
|
||||||
tuples = getTuples(stream);
|
stream.setStreamContext(streamContext);
|
||||||
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assertEquals(10, tuples.size());
|
assertEquals(10, tuples.size());
|
||||||
assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
|
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
|
||||||
assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
|
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
|
||||||
assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
|
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
|
||||||
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
|
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
|
||||||
|
|
||||||
// Basic test for alias
|
// Basic test for alias
|
||||||
expression =
|
expression =
|
||||||
"innerJoin("
|
"innerJoin("
|
||||||
+ " select("
|
+ " select("
|
||||||
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
||||||
+ " personId_i as personId,"
|
+ " personId_i as personId,"
|
||||||
+ " rating_f as rating"
|
+ " rating_f as rating"
|
||||||
+ " ),"
|
+ " ),"
|
||||||
+ " select("
|
+ " select("
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
|
||||||
+ " PERSONID as personId,"
|
+ " PERSONID as personId,"
|
||||||
+ " NAME as personName,"
|
+ " NAME as personName,"
|
||||||
+ " COUNTRY_NAME as country"
|
+ " COUNTRY_NAME as country"
|
||||||
+ " ),"
|
+ " ),"
|
||||||
+ " on=\"personId\""
|
+ " on=\"personId\""
|
||||||
+ ")";
|
+ ")";
|
||||||
|
|
||||||
stream = factory.constructStream(expression);
|
stream = factory.constructStream(expression);
|
||||||
tuples = getTuples(stream);
|
stream.setStreamContext(streamContext);
|
||||||
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assertEquals(10, tuples.size());
|
assertEquals(10, tuples.size());
|
||||||
assertOrderOf(tuples, "personId", 11,12,13,14,15,16,17,18,19,20);
|
assertOrderOf(tuples, "personId", 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
|
||||||
assertOrderOf(tuples, "rating", 3.5d,5d,2.2d,4.3d,3.5d,3d,3d,4d,4.1d,4.8d);
|
assertOrderOf(tuples, "rating", 3.5d, 5d, 2.2d, 4.3d, 3.5d, 3d, 3d, 4d, 4.1d, 4.8d);
|
||||||
assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
|
assertOrderOf(tuples, "personName", "Emma", "Grace", "Hailey", "Isabella", "Lily", "Madison", "Mia", "Natalie", "Olivia", "Samantha");
|
||||||
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
|
assertOrderOf(tuples, "country", "Netherlands", "United States", "Netherlands", "Netherlands", "Netherlands", "United States", "United States", "Netherlands", "Netherlands", "United States");
|
||||||
|
} finally {
|
||||||
|
solrClientCache.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -458,49 +488,57 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
TupleStream stream;
|
TupleStream stream;
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
|
||||||
// Basic test
|
StreamContext streamContext = new StreamContext();
|
||||||
expression =
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
"rollup("
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
+ " hashJoin("
|
|
||||||
+ " hashed=select("
|
|
||||||
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
|
||||||
+ " personId_i as personId,"
|
|
||||||
+ " rating_f as rating"
|
|
||||||
+ " ),"
|
|
||||||
+ " select("
|
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
|
|
||||||
+ " ID as personId,"
|
|
||||||
+ " NAME as personName,"
|
|
||||||
+ " COUNTRY_NAME as country"
|
|
||||||
+ " ),"
|
|
||||||
+ " on=\"personId\""
|
|
||||||
+ " ),"
|
|
||||||
+ " over=\"country\","
|
|
||||||
+ " max(rating),"
|
|
||||||
+ " min(rating),"
|
|
||||||
+ " avg(rating),"
|
|
||||||
+ " count(*)"
|
|
||||||
+ ")";
|
|
||||||
|
|
||||||
stream = factory.constructStream(expression);
|
try {
|
||||||
tuples = getTuples(stream);
|
// Basic test
|
||||||
|
expression =
|
||||||
|
"rollup("
|
||||||
|
+ " hashJoin("
|
||||||
|
+ " hashed=select("
|
||||||
|
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
||||||
|
+ " personId_i as personId,"
|
||||||
|
+ " rating_f as rating"
|
||||||
|
+ " ),"
|
||||||
|
+ " select("
|
||||||
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by COUNTRIES.COUNTRY_NAME\", sort=\"COUNTRIES.COUNTRY_NAME asc\"),"
|
||||||
|
+ " ID as personId,"
|
||||||
|
+ " NAME as personName,"
|
||||||
|
+ " COUNTRY_NAME as country"
|
||||||
|
+ " ),"
|
||||||
|
+ " on=\"personId\""
|
||||||
|
+ " ),"
|
||||||
|
+ " over=\"country\","
|
||||||
|
+ " max(rating),"
|
||||||
|
+ " min(rating),"
|
||||||
|
+ " avg(rating),"
|
||||||
|
+ " count(*)"
|
||||||
|
+ ")";
|
||||||
|
|
||||||
assertEquals(2, tuples.size());
|
stream = factory.constructStream(expression);
|
||||||
|
stream.setStreamContext(streamContext);
|
||||||
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
Tuple tuple = tuples.get(0);
|
assertEquals(2, tuples.size());
|
||||||
assertEquals("Netherlands",tuple.getString("country"));
|
|
||||||
assertTrue(4.3D == tuple.getDouble("max(rating)"));
|
|
||||||
assertTrue(2.2D == tuple.getDouble("min(rating)"));
|
|
||||||
assertTrue(3.6D == tuple.getDouble("avg(rating)"));
|
|
||||||
assertTrue(6D == tuple.getDouble("count(*)"));
|
|
||||||
|
|
||||||
tuple = tuples.get(1);
|
Tuple tuple = tuples.get(0);
|
||||||
assertEquals("United States",tuple.getString("country"));
|
assertEquals("Netherlands", tuple.getString("country"));
|
||||||
assertTrue(5D == tuple.getDouble("max(rating)"));
|
assertTrue(4.3D == tuple.getDouble("max(rating)"));
|
||||||
assertTrue(3D == tuple.getDouble("min(rating)"));
|
assertTrue(2.2D == tuple.getDouble("min(rating)"));
|
||||||
assertTrue(3.95D == tuple.getDouble("avg(rating)"));
|
assertTrue(3.6D == tuple.getDouble("avg(rating)"));
|
||||||
assertTrue(4D == tuple.getDouble("count(*)"));
|
assertTrue(6D == tuple.getDouble("count(*)"));
|
||||||
|
|
||||||
|
tuple = tuples.get(1);
|
||||||
|
assertEquals("United States", tuple.getString("country"));
|
||||||
|
assertTrue(5D == tuple.getDouble("max(rating)"));
|
||||||
|
assertTrue(3D == tuple.getDouble("min(rating)"));
|
||||||
|
assertTrue(3.95D == tuple.getDouble("avg(rating)"));
|
||||||
|
assertTrue(4D == tuple.getDouble("count(*)"));
|
||||||
|
} finally {
|
||||||
|
solrClientCache.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected=IOException.class)
|
@Test(expected=IOException.class)
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
|
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||||
import org.apache.solr.client.solrj.io.Tuple;
|
import org.apache.solr.client.solrj.io.Tuple;
|
||||||
import org.apache.solr.client.solrj.io.eval.AddEvaluator;
|
import org.apache.solr.client.solrj.io.eval.AddEvaluator;
|
||||||
import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
|
import org.apache.solr.client.solrj.io.eval.GreaterThanEvaluator;
|
||||||
|
@ -92,6 +93,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
|
||||||
String clause;
|
String clause;
|
||||||
TupleStream stream;
|
TupleStream stream;
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
StreamContext streamContext = new StreamContext();
|
||||||
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
|
|
||||||
StreamFactory factory = new StreamFactory()
|
StreamFactory factory = new StreamFactory()
|
||||||
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
|
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
|
||||||
|
@ -101,21 +105,24 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
|
||||||
.withFunctionName("if", IfThenElseEvaluator.class)
|
.withFunctionName("if", IfThenElseEvaluator.class)
|
||||||
.withFunctionName("gt", GreaterThanEvaluator.class)
|
.withFunctionName("gt", GreaterThanEvaluator.class)
|
||||||
;
|
;
|
||||||
|
try {
|
||||||
// Basic test
|
// Basic test
|
||||||
clause = "select("
|
clause = "select("
|
||||||
+ "id,"
|
+ "id,"
|
||||||
+ "add(b_i,c_d) as result,"
|
+ "add(b_i,c_d) as result,"
|
||||||
+ "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
|
+ "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
|
||||||
+ ")";
|
+ ")";
|
||||||
stream = factory.constructStream(clause);
|
stream = factory.constructStream(clause);
|
||||||
tuples = getTuples(stream);
|
stream.setStreamContext(streamContext);
|
||||||
assertFields(tuples, "id", "result");
|
tuples = getTuples(stream);
|
||||||
assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
|
assertFields(tuples, "id", "result");
|
||||||
assertEquals(1, tuples.size());
|
assertNotFields(tuples, "a_s", "b_i", "c_d", "d_b");
|
||||||
assertDouble(tuples.get(0), "result", 4.3);
|
assertEquals(1, tuples.size());
|
||||||
assertEquals(4.3, tuples.get(0).get("result"));
|
assertDouble(tuples.get(0), "result", 4.3);
|
||||||
|
assertEquals(4.3, tuples.get(0).get("result"));
|
||||||
|
} finally {
|
||||||
|
solrClientCache.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
|
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue