SOLR-9077: Streaming expressions should support collection alias

This commit is contained in:
Kevin Risden 2016-10-16 13:12:00 -04:00
parent 782923b894
commit ace423e958
12 changed files with 581 additions and 531 deletions

View File

@ -118,6 +118,8 @@ New Features
* SOLR-9666: SolrJ LukeResponse support dynamic fields (Fengtan via Kevin Risden)
* SOLR-9077: Streaming expressions should support collection alias (Kevin Risden)
Optimizations
----------------------
* SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have

View File

@ -28,8 +28,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -78,12 +78,7 @@ class StatementImpl implements Statement {
protected SolrStream constructStream(String sql) throws IOException {
try {
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getCollection());
if(slices == null) {
throw new Exception("Collection not found:"+this.connection.getCollection());
}
Collection<Slice> slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {

View File

@ -49,6 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@ -60,6 +61,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
/**
* Connects to Zookeeper to pick replicas from a specific collection to send the query to.
@ -352,37 +354,57 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
public static Collection<Slice> getSlicesIgnoreCase(String name, ClusterState clusterState) {
for (String coll : clusterState.getCollectionStates().keySet()) {
if (coll.equalsIgnoreCase(name)) {
DocCollection collection = clusterState.getCollectionOrNull(coll);
if (collection != null) return collection.getActiveSlices();
public static Collection<Slice> getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
ClusterState clusterState = zkStateReader.getClusterState();
Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
// Check collection case sensitive
if(collectionsMap.containsKey(collectionName)) {
return collectionsMap.get(collectionName).getActiveSlices();
}
// Check collection case insensitive
for(String collectionMapKey : collectionsMap.keySet()) {
if(collectionMapKey.equalsIgnoreCase(collectionName)) {
return collectionsMap.get(collectionMapKey).getActiveSlices();
}
}
return null;
if(checkAlias) {
// check for collection alias
Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) {
Collection<Slice> slices = new ArrayList<>();
List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
for (String aliasCollectionName : aliasList) {
// Add all active slices for this alias collection
slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices());
}
return slices;
}
}
throw new IOException("Slices not found for " + collectionName);
}
protected void constructStreams() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
//System.out.println("Connected to zk an got cluster state.");
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
if (slices == null) slices = getSlicesIgnoreCase(this.collection, clusterState);
if (slices == null) {
throw new Exception("Collection not found:" + this.collection);
}
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);

View File

@ -250,17 +250,15 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
}
private List<String> getShardUrls() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();

View File

@ -257,15 +257,17 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
}
protected void constructStreams() throws IOException {
try {
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
List<Replica> shuffler = new ArrayList();
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {

View File

@ -115,8 +115,6 @@ public class SolrStream extends TupleStream {
**/
public void open() throws IOException {
if(cache == null) {
client = new HttpSolrClient.Builder(baseUrl).build();
} else {

View File

@ -332,19 +332,18 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
protected List<String> getShardUrls() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
List baseUrls = new ArrayList();
List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
shuffler.add(replica);
@ -359,7 +358,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
return baseUrls;
} catch (Exception e) {
throw new IOException(e);
}

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -407,18 +406,21 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getCheckpoints() throws IOException {
this.checkpoints = new HashMap();
this.checkpoints = new HashMap<>();
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(collection);
Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
String sliceName = slice.getName();
long checkpoint = 0;
long checkpoint;
if(initialCheckpoint > -1) {
checkpoint = initialCheckpoint;
} else {
checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
checkpoint = getCheckpoint(slice, liveNodes);
}
this.checkpoints.put(sliceName, checkpoint);
@ -482,21 +484,19 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getPersistedCheckpoints() throws IOException {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Collection<Slice> slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(checkpointCollection);
Set<String> liveNodes = clusterState.getLiveNodes();
OUTER:
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
try {
SolrDocument doc = httpClient.getById(id);
if(doc != null) {
List<String> checkpoints = (List<String>)doc.getFieldValue("checkpoint_ss");
@ -505,7 +505,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.checkpoints.put(pair[0], Long.parseLong(pair[1]));
}
}
}catch (Exception e) {
} catch (Exception e) {
throw new IOException(e);
}
break OUTER;
@ -515,22 +515,10 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
protected void constructStreams() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
//System.out.println("Connected to zk an got cluster state.");
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
if (slices == null) slices = getSlicesIgnoreCase(this.collection, clusterState);
if (slices == null) {
throw new Exception("Collection not found:" + this.collection);
}
Iterator<String> iterator = params.getParameterNamesIterator();
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
String fl = mParams.get("fl");
@ -542,12 +530,15 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Random random = new Random();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
long checkpoint = checkpoints.get(slice.getName());
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);

View File

@ -48,12 +48,10 @@ import org.junit.Test;
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class JdbcTest extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
private static final String COLLECTIONORALIAS = "collection1";
private static final String id = "id";
private static final int TIMEOUT = 30;
private static String zkHost;
@BeforeClass
@ -62,9 +60,18 @@ public class JdbcTest extends SolrCloudTestCase {
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
String collection;
boolean useAlias = random().nextBoolean();
if(useAlias) {
collection = COLLECTIONORALIAS + "_collection";
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
} else {
collection = COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, DEFAULT_TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "testnull_i", null)
@ -77,7 +84,7 @@ public class JdbcTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8", "testnull_i", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9", "testnull_i", null)
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "testnull_i", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), collection);
zkHost = cluster.getZkServer().getZkAddress();
}
@ -87,9 +94,9 @@ public class JdbcTest extends SolrCloudTestCase {
Properties props = new Properties();
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i desc limit 2")) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i desc limit 2")) {
assertTrue(rs.next());
assertEquals(14, rs.getLong("a_i"));
@ -112,7 +119,7 @@ public class JdbcTest extends SolrCloudTestCase {
}
//Test statement reuse
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc limit 2")) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc limit 2")) {
assertTrue(rs.next());
assertEquals(0, rs.getLong("a_i"));
@ -137,7 +144,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test connection reuse
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i desc limit 2")) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i desc limit 2")) {
assertTrue(rs.next());
assertEquals(14, rs.getLong("a_i"));
@ -153,7 +160,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test statement reuse
stmt.setMaxRows(2);
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc")) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc")) {
assertTrue(rs.next());
assertEquals(0, rs.getLong("a_i"));
@ -168,7 +175,7 @@ public class JdbcTest extends SolrCloudTestCase {
}
//Test simple loop. Since limit is set it will override the statement maxRows.
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc LIMIT 100")) {
try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc LIMIT 100")) {
int count = 0;
while (rs.next()) {
++count;
@ -186,9 +193,9 @@ public class JdbcTest extends SolrCloudTestCase {
//Test facet aggregation
Properties props = new Properties();
props.put("aggregationMode", "facet");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@ -226,9 +233,9 @@ public class JdbcTest extends SolrCloudTestCase {
Properties props = new Properties();
props.put("aggregationMode", "map_reduce");
props.put("numWorkers", "2");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@ -264,7 +271,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test params on the url
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost +
"?collection=collection1&aggregationMode=map_reduce&numWorkers=2")) {
"?collection=" + COLLECTIONORALIAS + "&aggregationMode=map_reduce&numWorkers=2")) {
Properties p = ((ConnectionImpl) con).getProperties();
@ -272,7 +279,7 @@ public class JdbcTest extends SolrCloudTestCase {
assert (p.getProperty("numWorkers").equals("2"));
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@ -308,7 +315,7 @@ public class JdbcTest extends SolrCloudTestCase {
// Test JDBC paramters in URL
try (Connection con = DriverManager.getConnection(
"jdbc:solr://" + zkHost + "?collection=collection1&username=&password=&testKey1=testValue&testKey2")) {
"jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS + "&username=&password=&testKey1=testValue&testKey2")) {
Properties p = ((ConnectionImpl) con).getProperties();
assertEquals("", p.getProperty("username"));
@ -317,7 +324,7 @@ public class JdbcTest extends SolrCloudTestCase {
assertEquals("", p.getProperty("testKey2"));
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@ -353,7 +360,7 @@ public class JdbcTest extends SolrCloudTestCase {
// Test JDBC paramters in properties
Properties providedProperties = new Properties();
providedProperties.put("collection", "collection1");
providedProperties.put("collection", COLLECTIONORALIAS);
providedProperties.put("username", "");
providedProperties.put("password", "");
providedProperties.put("testKey1", "testValue");
@ -367,7 +374,7 @@ public class JdbcTest extends SolrCloudTestCase {
assert (p.getProperty("testKey2").equals(""));
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@ -402,9 +409,9 @@ public class JdbcTest extends SolrCloudTestCase {
//Test error propagation
Properties props = new Properties();
props.put("aggregationMode", "facet");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select crap from collection1 group by a_s " +
try (ResultSet rs = stmt.executeQuery("select crap from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
} catch (Exception e) {
String errorMessage = e.getMessage();
@ -416,7 +423,7 @@ public class JdbcTest extends SolrCloudTestCase {
@Test
public void testSQLExceptionThrownWhenQueryAndConnUseDiffCollections() throws Exception {
String badCollection = COLLECTION + "bad";
String badCollection = COLLECTIONORALIAS + "bad";
String connectionString = "jdbc:solr://" + zkHost + "?collection=" + badCollection;
String sql = "select id, a_i, a_s, a_f from " + badCollection + " order by a_i desc limit 2";
@ -434,7 +441,7 @@ public class JdbcTest extends SolrCloudTestCase {
@Test
public void testDriverMetadata() throws Exception {
String collection = COLLECTION;
String collection = COLLECTIONORALIAS;
String connectionString1 = "jdbc:solr://" + zkHost + "?collection=" + collection +
"&username=&password=&testKey1=testValue&testKey2";

View File

@ -50,7 +50,7 @@ import org.junit.Test;
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class JDBCStreamTest extends SolrCloudTestCase {
private static final String COLLECTION = "jdbc";
private static final String COLLECTIONORALIAS = "jdbc";
private static final int TIMEOUT = 30;
@ -62,8 +62,17 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
String collection;
boolean useAlias = random().nextBoolean();
if(useAlias) {
collection = COLLECTIONORALIAS + "_collection";
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
} else {
collection = COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@ -99,7 +108,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
@Before
@ -200,10 +209,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
.add(id, "1", "code_s", "CA", "name_s", "Canada")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
List<Tuple> tuples;
@ -211,7 +220,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
// Simple 1
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
TupleStream searchStream = factory.constructStream("search(" + COLLECTION + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
tuples = getTuples(mergeStream);
@ -225,7 +234,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
@ -262,7 +271,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@ -272,7 +281,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -299,7 +308,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
@ -336,7 +345,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@ -349,7 +358,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -378,7 +387,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -405,7 +414,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinRollupExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
@ -448,7 +457,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@ -459,7 +468,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
"rollup("
+ " hashJoin("
+ " hashed=select("
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"

View File

@ -51,253 +51,262 @@ import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
* SolrStream will get fully exercised through these tests.
*
**/
* All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
* SolrStream will get fully exercised through these tests.
*
**/
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class StreamingTest extends SolrCloudTestCase {
public static final int TIMEOUT = 30;
public static final String COLLECTIONORALIAS = "streams";
public static final String COLLECTION = "streams";
private static final StreamFactory streamFactory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("parallel", ParallelStream.class);
private static final StreamFactory streamFactory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("parallel", ParallelStream.class);
private static String zkHost;
private static String zkHost;
private static int numShards;
private static int numWorkers;
private static boolean useAlias;
private static int numShards;
private static int numWorkers;
@BeforeClass
public static void configureCluster() throws Exception {
numShards = random().nextInt(2) + 1; //1 - 3
numWorkers = numShards > 2 ? random().nextInt(numShards - 1) + 1 : numShards;
configureCluster(numShards)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
@BeforeClass
public static void configureCluster() throws Exception {
numShards = random().nextInt(2) + 1; //1 - 3
numWorkers = numShards > 2 ? random().nextInt(numShards - 1) + 1 : numShards;
configureCluster(numShards)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", numShards, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
zkHost = cluster.getZkServer().getZkAddress();
streamFactory.withCollectionZkHost(COLLECTION, zkHost);
String collection;
useAlias = random().nextBoolean();
if(useAlias) {
collection = COLLECTIONORALIAS + "_collection";
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
} else {
collection = COLLECTIONORALIAS;
}
private static final String id = "id";
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(), false, true, DEFAULT_TIMEOUT);
@Before
public void clearCollection() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
}
zkHost = cluster.getZkServer().getZkAddress();
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
}
@Test
public void testUniqueStream() throws Exception {
private static final String id = "id";
//Test CloudSolrStream and UniqueStream
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
@Before
public void clearCollection() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
List<Tuple> tuples = getTuples(ustream);
assertEquals(4, tuples.size());
assertOrder(tuples, 0,1,3,4);
@Test
public void testUniqueStream() throws Exception {
}
//Test CloudSolrStream and UniqueStream
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
@Test
public void testSpacesInParams() throws Exception {
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
List<Tuple> tuples = getTuples(ustream);
assertEquals(4, tuples.size());
assertOrder(tuples, 0,1,3,4);
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc");
}
//CloudSolrStream compares the values of the sort with the fl field.
//The constructor will throw an exception if the sort fields do not the
//a value in the field list.
@Test
public void testSpacesInParams() throws Exception {
CloudSolrStream stream = new CloudSolrStream("", "collection1", sParams);
}
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc");
@Test
public void testNonePartitionKeys() throws Exception {
//CloudSolrStream compares the values of the sort with the fl field.
//The constructor will throw an exception if the sort fields do not the
//a value in the field list.
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
CloudSolrStream stream = new CloudSolrStream("", "collection1", sParams);
}
SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
@Test
public void testNonePartitionKeys() throws Exception {
assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
@Test
public void testParallelUniqueStream() throws Exception {
assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTION);
}
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assertEquals(5, tuples.size());
assertOrder(tuples, 0, 1, 3, 4, 6);
@Test
public void testParallelUniqueStream() throws Exception {
//Test the eofTuples
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
Map<String,Tuple> eofTuples = pstream.getEofTuples();
assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assertEquals(5, tuples.size());
assertOrder(tuples, 0, 1, 3, 4, 6);
}
//Test the eofTuples
@Test
public void testMultipleFqClauses() throws Exception {
Map<String,Tuple> eofTuples = pstream.getEofTuples();
assertEquals(numWorkers, eofTuples.size()); //There should be an EOF tuple for each worker.
new UpdateRequest()
.add(id, "0", "a_ss", "hello0", "a_ss", "hello1", "a_i", "0", "a_f", "0")
.add(id, "2", "a_ss", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_ss", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_ss", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_ss", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_ss", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_ss", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_ss", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_ss", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTION);
}
streamFactory.withCollectionZkHost(COLLECTION, zkHost);
@Test
public void testMultipleFqClauses() throws Exception {
ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
"sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
List<Tuple> tuples = getTuples(stream);
assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
}
new UpdateRequest()
.add(id, "0", "a_ss", "hello0", "a_ss", "hello1", "a_i", "0", "a_f", "0")
.add(id, "2", "a_ss", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_ss", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_ss", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_ss", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_ss", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_ss", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_ss", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_ss", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
@Test
public void testRankStream() throws Exception {
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
"sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
List<Tuple> tuples = getTuples(stream);
assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
}
@Test
public void testRankStream() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
List<Tuple> tuples = getTuples(rstream);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
List<Tuple> tuples = getTuples(rstream);
assertEquals(3, tuples.size());
assertOrder(tuples, 4,3,2);
assertEquals(3, tuples.size());
assertOrder(tuples, 4,3,2);
}
}
@Test
public void testParallelRankStream() throws Exception {
@Test
public void testParallelRankStream() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1")
.add(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1")
.add(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1")
.add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
.add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1")
.add(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1")
.add(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1")
.add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
.add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assertEquals(10, tuples.size());
assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
assertEquals(10, tuples.size());
assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
}
}
@Test
public void testTrace() throws Exception {
@Test
public void testTrace() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test with spaces in the parameter lists.
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream.setTrace(true);
List<Tuple> tuples = getTuples(stream);
assertEquals(COLLECTION, tuples.get(0).get("_COLLECTION_"));
assertEquals(COLLECTION, tuples.get(1).get("_COLLECTION_"));
assertEquals(COLLECTION, tuples.get(2).get("_COLLECTION_"));
assertEquals(COLLECTION, tuples.get(3).get("_COLLECTION_"));
//Test with spaces in the parameter lists.
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
stream.setTrace(true);
List<Tuple> tuples = getTuples(stream);
assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
}
@Test
@ -314,11 +323,11 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test with spaces in the parameter lists.
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@ -341,7 +350,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Test with spaces in the parameter lists using a comparator
sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
rstream = new ReducerStream(stream,
new FieldComparator("a_s", ComparatorOrder.ASCENDING),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
@ -379,11 +388,11 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test with spaces in the parameter lists.
SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@ -408,10 +417,10 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
@ -437,7 +446,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Test Descending with Ascending subsort
sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
@ -477,11 +486,11 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test an error that comes originates from the /select handler
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ExceptionStream estream = new ExceptionStream(stream);
Tuple t = getTuple(estream);
assertTrue(t.EOF);
@ -490,7 +499,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Test an error that comes originates from the /export handler
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
estream = new ExceptionStream(stream);
t = getTuple(estream);
assertTrue(t.EOF);
@ -514,11 +523,11 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
ExceptionStream estream = new ExceptionStream(pstream);
Tuple t = getTuple(estream);
assertTrue(t.EOF);
@ -529,8 +538,8 @@ public class StreamingTest extends SolrCloudTestCase {
//Test an error that originates from the /select handler
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
assertTrue(t.EOF);
@ -540,8 +549,8 @@ public class StreamingTest extends SolrCloudTestCase {
//Test an error that originates from the /export handler
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
assertTrue(t.EOF);
@ -564,7 +573,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*");
@ -578,7 +587,7 @@ public class StreamingTest extends SolrCloudTestCase {
new MeanMetric("a_f"),
new CountMetric()};
StatsStream statsStream = new StatsStream(zkHost, COLLECTION, sParamsA, metrics);
StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
List<Tuple> tuples = getTuples(statsStream);
@ -624,7 +633,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
@ -643,7 +652,7 @@ public class StreamingTest extends SolrCloudTestCase {
FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
ComparatorOrder.ASCENDING)};
FacetStream facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
FacetStream facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
List<Tuple> tuples = getTuples(facetStream);
@ -725,7 +734,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -808,7 +817,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -889,7 +898,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -1015,7 +1024,7 @@ public class StreamingTest extends SolrCloudTestCase {
// }
// }
// SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc");
// try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTION, exportParams)) {
// try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
// List<Tuple> tuples = getTuples(solrStream);
// assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
// // Since the getTuples method doesn't return the EOF tuple, these two entries should be the same size.
@ -1031,7 +1040,7 @@ public class StreamingTest extends SolrCloudTestCase {
List<String> selectOrder = ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder);
List<String> selectOrderBool = ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool);
SolrParams exportParams = mapParams("q", "*:*", "qt", "/export", "fl", "id," + field, "sort", field + " " + sortDir + ",id asc");
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTION, exportParams)) {
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
List<Tuple> tuples = getTuples(solrStream);
assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
// Since the getTuples method doesn't return the EOF tuple, these two entries should be the same size.
@ -1070,7 +1079,7 @@ public class StreamingTest extends SolrCloudTestCase {
}
SolrParams sParams = mapParams("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc");
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTION, sParams)) {
try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
List<Tuple> tuples = getTuples(solrStream);
assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
@ -1185,7 +1194,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(docPairs(8, "aaa"))
.add(docPairs(8, "ooo"))
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
@ -1216,7 +1225,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8")
.add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9")
.add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
@ -1229,7 +1238,7 @@ public class StreamingTest extends SolrCloudTestCase {
FacetStream facetStream = new FacetStream(
zkHost,
COLLECTION,
COLLECTIONORALIAS,
sParamsA,
buckets,
metrics,
@ -1309,7 +1318,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
facetStream = new FacetStream(
zkHost,
COLLECTION,
COLLECTIONORALIAS,
sParamsA,
buckets,
metrics,
@ -1401,10 +1410,10 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@ -1501,10 +1510,10 @@ public class StreamingTest extends SolrCloudTestCase {
//Test will null value in the grouping field
new UpdateRequest()
.add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets1 = {new Bucket("a_s")};
@ -1549,6 +1558,7 @@ public class StreamingTest extends SolrCloudTestCase {
@Test
public void testDaemonTopicStream() throws Exception {
Assume.assumeTrue(!useAlias);
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
@ -1557,8 +1567,8 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
TopicStream topicStream = new TopicStream(zkHost,
COLLECTION,
COLLECTION,
COLLECTIONORALIAS,
COLLECTIONORALIAS,
"50000000",
-1,
1000000, sParams);
@ -1575,7 +1585,7 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
int count = 0;
while(count == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, sParams1);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
List<Tuple> tuples = getTuples(solrStream);
count = tuples.size();
if(count > 0) {
@ -1592,7 +1602,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
for(int i=0; i<5; i++) {
daemonStream.read();
@ -1601,7 +1611,7 @@ public class StreamingTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
.add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
for(int i=0; i<2; i++) {
daemonStream.read();
@ -1631,10 +1641,10 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@ -1742,10 +1752,10 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
@ -1762,10 +1772,10 @@ public class StreamingTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi",
"1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f,s_multi,i_multi,f_multi", "sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
List<Tuple> tuples = getTuples(stream);
Tuple tuple = tuples.get(0);
@ -1803,14 +1813,14 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test ascending
SolrParams sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
SolrParams sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(mstream);
@ -1820,10 +1830,10 @@ public class StreamingTest extends SolrCloudTestCase {
//Test descending
sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
tuples = getTuples(mstream);
@ -1834,10 +1844,10 @@ public class StreamingTest extends SolrCloudTestCase {
//Test compound sort
sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
tuples = getTuples(mstream);
@ -1846,10 +1856,10 @@ public class StreamingTest extends SolrCloudTestCase {
assertOrder(tuples, 0,2,1,3,4);
sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
tuples = getTuples(mstream);
@ -1873,14 +1883,14 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
.add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
.add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test ascending
SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
@ -1892,10 +1902,10 @@ public class StreamingTest extends SolrCloudTestCase {
//Test descending
sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
@ -1921,14 +1931,14 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
.add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
.add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Test ascending
SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
@ -1950,13 +1960,13 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
//Basic CloudSolrStream Test with Descending Sort
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
List<Tuple> tuples = getTuples(stream);
assertEquals(5,tuples.size());
@ -1964,7 +1974,7 @@ public class StreamingTest extends SolrCloudTestCase {
//With Ascending Sort
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
@ -1973,7 +1983,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Test compound sort
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
@ -1981,7 +1991,7 @@ public class StreamingTest extends SolrCloudTestCase {
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
@ -1998,7 +2008,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "2", "b_sing", "false", "dt_sing", "1981-04-04T01:02:03.78Z")
.add(id, "1", "b_sing", "true", "dt_sing", "1980-04-04T01:02:03.78Z")
.add(id, "4", "b_sing", "true", "dt_sing", "1980-04-04T01:02:03.78Z")
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
trySortWithQt("/export");
@ -2008,7 +2018,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Basic CloudSolrStream Test bools desc
SolrParams sParams = mapParams("q", "*:*", "qt", which, "fl", "id,b_sing", "sort", "b_sing asc,id asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
try {
List<Tuple> tuples = getTuples(stream);
@ -2017,7 +2027,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Basic CloudSolrStream Test bools desc
sParams = mapParams("q", "*:*", "qt", which, "fl", "id,b_sing", "sort", "b_sing desc,id desc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals (5,tuples.size());
@ -2025,7 +2035,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Basic CloudSolrStream Test dates desc
sParams = mapParams("q", "*:*", "qt", which, "fl", "id,dt_sing", "sort", "dt_sing desc,id asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals (5,tuples.size());
@ -2033,7 +2043,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Basic CloudSolrStream Test ates desc
sParams = mapParams("q", "*:*", "qt", which, "fl", "id,dt_sing", "sort", "dt_sing asc,id desc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
tuples = getTuples(stream);
assertEquals (5,tuples.size());
@ -2062,7 +2072,7 @@ public class StreamingTest extends SolrCloudTestCase {
"dt_sing", "1980-01-02T11:11:33.89Z", "dt_multi", "1981-03-04T01:02:03.78Z", "dt_multi", "1981-05-24T04:05:06.99Z",
"b_sing", "true", "b_multi", "false", "b_multi", "true"
)
.commit(cluster.getSolrClient(), COLLECTION);
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
tryWithQt("/export");
tryWithQt("/select");
@ -2073,7 +2083,7 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "qt", which, "fl",
"id,i_sing,i_multi,l_sing,l_multi,f_sing,f_multi,d_sing,d_multi,dt_sing,dt_multi,s_sing,s_multi,b_sing,b_multi",
"sort", "i_sing asc");
try (CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams)) {
try (CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
Tuple tuple = getTuple(stream); // All I really care about is that all the fields are returned. There's
@ -2208,7 +2218,7 @@ public class StreamingTest extends SolrCloudTestCase {
}
private ParallelStream parallelStream(TupleStream stream, FieldComparator comparator) throws IOException {
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, numWorkers, comparator);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTIONORALIAS, stream, numWorkers, comparator);
return pstream;
}