SOLR-7535: Add UpdateStream to Streaming API and Streaming Expression

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1722990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2016-01-05 00:32:29 +00:00
parent d478d7a293
commit 26a1d1ea34
12 changed files with 555 additions and 21 deletions

View File

@ -123,6 +123,8 @@ New Features
* SOLR-8436: Real-time get now supports filters. (yonik)
* SOLR-7535: Add UpdateStream to Streaming API and Streaming Expression (Jason Gerlowski, Joel Bernstein)
Bug Fixes
----------------------
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been

View File

@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.UpdateStream;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@ -74,6 +75,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
static SolrClientCache clientCache = new SolrClientCache();
private StreamFactory streamFactory = new StreamFactory();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
public void inform(SolrCore core) {
@ -90,11 +92,13 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
String defaultCollection = null;
String defaultZkhost = null;
CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
this.coreName = core.getName();
if(coreContainer.isZooKeeperAware()) {
defaultCollection = core.getCoreDescriptor().getCollectionName();
defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
streamFactory.withDefaultZkHost(defaultZkhost);
}
streamFactory
@ -113,6 +117,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
@ -167,6 +172,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
context.workerID = worker;
context.numWorkers = numWorkers;
context.setSolrClientCache(clientCache);
context.put("core", this.coreName);
tupleStream.setStreamContext(context);
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
}

View File

@ -117,7 +117,10 @@ public class SortingResponseWriter implements QueryResponseWriter {
fields = fl.split(",");
for(int i=0;i<fields.length; i++) {
if(fl.trim().equals("score")) {
fields[i] = fields[i].trim();
if(fields[i].equals("score")) {
exception = new IOException(new SyntaxError("Scoring is not currently supported with xsort."));
break;
}

View File

@ -138,6 +138,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();

View File

@ -166,6 +166,9 @@ public class FacetStream extends TupleStream implements Expressible {
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();

View File

@ -131,6 +131,9 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();

View File

@ -33,7 +33,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
* substreams.
**/
public class PushBackStream extends TupleStream {
public class PushBackStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;

View File

@ -103,6 +103,9 @@ public class StatsStream extends TupleStream implements Expressible {
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();

View File

@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends tuples emitted by a wrapped {@link TupleStream} as updates to a SolrCloud collection.
*/
public class UpdateStream extends TupleStream implements Expressible {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String collection;
private String zkHost;
private int updateBatchSize;
private int batchNumber;
private long totalDocsIndex;
private PushBackStream tupleSource;
private transient SolrClientCache cache;
private transient CloudSolrClient cloudSolrClient;
private List<SolrInputDocument> documentBatch = new ArrayList();
private String coreName;
public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
verifyCollectionName(collectionName, expression);
String zkHost = findZkHost(factory, collectionName, expression);
verifyZkHost(zkHost, collectionName, expression);
int updateBatchSize = extractBatchSize(expression, factory);
//Extract underlying TupleStream.
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
if (1 != streamExpressions.size()) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
StreamExpression sourceStreamExpression = streamExpressions.get(0);
init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, updateBatchSize);
}
public UpdateStream(String collectionName, TupleStream tupleSource, String zkHost, int updateBatchSize) throws IOException {
if (updateBatchSize <= 0) {
throw new IOException(String.format(Locale.ROOT,"batchSize '%d' must be greater than 0.", updateBatchSize));
}
init(collectionName, tupleSource, zkHost, updateBatchSize);
}
private void init(String collectionName, TupleStream tupleSource, String zkHost, int updateBatchSize) {
this.collection = collectionName;
this.zkHost = zkHost;
this.updateBatchSize = updateBatchSize;
this.tupleSource = new PushBackStream(tupleSource);
}
@Override
public void open() throws IOException {
setCloudSolrClient();
tupleSource.open();
}
@Override
public Tuple read() throws IOException {
for (int i = 0; i < updateBatchSize; i++) {
Tuple tuple = tupleSource.read();
if (tuple.EOF) {
if (documentBatch.isEmpty()) {
return tuple;
} else {
tupleSource.pushBack(tuple);
uploadBatchToCollection(documentBatch);
int b = documentBatch.size();
documentBatch.clear();
return createBatchSummaryTuple(b);
}
}
documentBatch.add(convertTupleToSolrDocument(tuple));
}
uploadBatchToCollection(documentBatch);
int b = documentBatch.size();
documentBatch.clear();
return createBatchSummaryTuple(b);
}
@Override
public void close() throws IOException {
if(cache == null && cloudSolrClient != null) {
cloudSolrClient.close();
}
tupleSource.close();
}
@Override
public StreamComparator getStreamSort() {
return tupleSource.getStreamSort();
}
@Override
public List<TupleStream> children() {
ArrayList<TupleStream> sourceList = new ArrayList<TupleStream>(1);
sourceList.add(tupleSource);
return sourceList;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(collection);
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(updateBatchSize)));
if(tupleSource instanceof Expressible){
expression.addParameter(((Expressible)tupleSource).toExpression(factory));
} else {
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
return expression;
}
@Override
public void setStreamContext(StreamContext context) {
this.cache = context.getSolrClientCache();
this.coreName = (String)context.get("core");
this.tupleSource.setStreamContext(context);
}
private void verifyCollectionName(String collectionName, StreamExpression expression) throws IOException {
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
}
private String findZkHost(StreamFactory factory, String collectionName, StreamExpression expression) {
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
if(null == zkHostExpression){
String zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
return factory.getDefaultZkHost();
} else {
return zkHost;
}
} else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
return ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
return null;
}
private void verifyZkHost(String zkHost, String collectionName, StreamExpression expression) throws IOException {
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
}
private int extractBatchSize(StreamExpression expression, StreamFactory factory) throws IOException {
StreamExpressionNamedParameter batchSizeParam = factory.getNamedOperand(expression, "batchSize");
if(null == batchSizeParam || null == batchSizeParam.getParameter() || !(batchSizeParam.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a 'batchSize' parameter of type positive integer but didn't find one",expression));
}
String batchSizeStr = ((StreamExpressionValue)batchSizeParam.getParameter()).getValue();
return parseBatchSize(batchSizeStr, expression);
}
private int parseBatchSize(String batchSizeStr, StreamExpression expression) throws IOException {
try{
int batchSize = Integer.parseInt(batchSizeStr);
if(batchSize <= 0){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize '%d' must be greater than 0.",expression, batchSize));
}
return batchSize;
}
catch(NumberFormatException e){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize '%s' is not a valid integer.",expression, batchSizeStr));
}
}
private void setCloudSolrClient() {
if(this.cache != null) {
this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
} else {
this.cloudSolrClient = new CloudSolrClient(zkHost);
this.cloudSolrClient.connect();
}
}
private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
SolrInputDocument doc = new SolrInputDocument();
for (Object field : tuple.fields.keySet()) {
if (! ((String)field).equals("_version_")) {
Object value = tuple.get(field);
if (value instanceof List) {
addMultivaluedField(doc, (String)field, (List<Object>)value);
} else {
doc.addField((String)field, tuple.get(field));
}
}
}
LOG.debug("Tuple [{}] was converted into SolrInputDocument [{}].", tuple, doc);
return doc;
}
private void addMultivaluedField(SolrInputDocument doc, String fieldName, List<Object> values) {
for (Object value : values) {
doc.addField(fieldName, value);
}
}
private void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
if (documentBatch.size() == 0) {
return;
}
try {
cloudSolrClient.add(collection, documentBatch);
} catch (SolrServerException | IOException e) {
LOG.warn("Unable to add documents to collection due to unexpected error.", e);
String className = e.getClass().getName();
String message = e.getMessage();
throw new IOException(String.format(Locale.ROOT,"Unexpected error when adding documents to collection %s- %s:%s", collection, className, message));
}
}
private Tuple createBatchSummaryTuple(int batchSize) {
assert batchSize > 0;
Map m = new HashMap();
this.totalDocsIndex += batchSize;
++batchNumber;
m.put("batchIndexed", batchSize);
m.put("totalIndexed", this.totalDocsIndex);
m.put("batchNumber", batchNumber);
if(coreName != null) {
m.put("worker", coreName);
}
return new Tuple(m);
}
}

View File

@ -44,6 +44,7 @@ public class StreamFactory implements Serializable {
private transient HashMap<String,String> collectionZkHosts;
private transient HashMap<String,Class> functionNames;
private transient String defaultZkHost;
public StreamFactory(){
collectionZkHosts = new HashMap<String,String>();
@ -54,6 +55,16 @@ public class StreamFactory implements Serializable {
this.collectionZkHosts.put(collectionName, zkHost);
return this;
}
public StreamFactory withDefaultZkHost(String zkHost) {
this.defaultZkHost = zkHost;
return this;
}
public String getDefaultZkHost() {
return this.defaultZkHost;
}
public String getCollectionZkHost(String collectionName){
if(this.collectionZkHosts.containsKey(collectionName)){
return this.collectionZkHosts.get(collectionName);

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@ -144,6 +145,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testSelectStream();
testFacetStream();
testSubFacetStream();
testUpdateStream();
testParallelUpdateStream();
}
private void testCloudSolrStream() throws Exception {
@ -313,9 +316,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// Basic test
expression = StreamExpressionParser.parse("merge("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "on=\"a_f asc\")");
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"),"
+ "on=\"a_f asc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
@ -324,9 +327,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// Basic test desc
expression = StreamExpressionParser.parse("merge("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "on=\"a_f desc\")");
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
+ "on=\"a_f desc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
@ -335,35 +338,35 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// Basic w/multi comp
expression = StreamExpressionParser.parse("merge("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
stream = new MergeStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
assertOrder(tuples, 0, 2, 1, 3, 4);
// full factory w/multi comp
stream = factory.constructStream("merge("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
assertOrder(tuples, 0, 2, 1, 3, 4);
// full factory w/multi streams
stream = factory.constructStream("merge("
+ "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc\")");
+ "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 4);
assertOrder(tuples, 0,2,1,4);
assertOrder(tuples, 0, 2, 1, 4);
del("*:*");
commit();
@ -1996,6 +1999,185 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testUpdateStream() throws Exception {
CloudSolrClient destinationCollectionClient = createCloudClient("destinationCollection");
createCollection("destinationCollection", destinationCollectionClient, 2, 2);
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777");
commit();
waitForRecoveriesToFinish("destinationCollection", false);
StreamExpression expression;
TupleStream stream;
Tuple t;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("destinationCollection", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class);
//Copy all docs to destinationCollection
expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))");
stream = new UpdateStream(expression, factory);
List<Tuple> tuples = getTuples(stream);
destinationCollectionClient.commit();
//Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs
assert(tuples.size() == 1);
t = tuples.get(0);
assert(t.EOF == false);
assertEquals(5, t.get("batchIndexed"));
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(destinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
destinationCollectionClient.deleteByQuery("*:*");
destinationCollectionClient.commit();
destinationCollectionClient.close();
del("*:*");
commit();
}
private void testParallelUpdateStream() throws Exception {
CloudSolrClient destinationCollectionClient = createCloudClient("parallelDestinationCollection");
createCollection("parallelDestinationCollection", destinationCollectionClient, 2, 2);
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777");
commit();
waitForRecoveriesToFinish("parallelDestinationCollection", false);
StreamExpression expression;
TupleStream stream;
Tuple t;
String zkHost = zkServer.getZkAddress();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("parallelDestinationCollection", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("parallel", ParallelStream.class);
//Copy all docs to destinationCollection
String updateExpression = "update(parallelDestinationCollection, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"))";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
List<Tuple> tuples = getTuples(parallelUpdateStream);
destinationCollectionClient.commit();
//Ensure that all UpdateStream tuples indicate the correct number of copied/indexed docs
long count = 0;
for(Tuple tuple : tuples) {
count+=tuple.getLong("batchIndexed");
}
assert(count == 5);
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(parallelDestinationCollection, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
destinationCollectionClient.deleteByQuery("*:*");
destinationCollectionClient.commit();
destinationCollectionClient.close();
del("*:*");
commit();
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();
@ -2109,6 +2291,23 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
return true;
}
private boolean assertList(List list, Object... vals) throws Exception {
if(list.size() != vals.length) {
throw new Exception("Lists are not the same size:"+list.size() +" : "+vals.length);
}
for(int i=0; i<list.size(); i++) {
Object a = list.get(i);
Object b = vals[i];
if(!a.equals(b)) {
throw new Exception("List items not equals:"+a+" : "+b);
}
}
return true;
}
@Override
protected void indexr(Object... fields) throws Exception {

View File

@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io.stream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@ -41,12 +42,14 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
factory = new StreamFactory()
.withCollectionZkHost("collection1", "testhost:1234")
.withCollectionZkHost("collection2", "testhost:1234")
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("count", CountMetric.class)
@ -164,6 +167,25 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
assertTrue(expressionString.contains("by=a_s"));
}
@Test
public void testUpdateStream() throws Exception {
StreamExpression expression = StreamExpressionParser.parse("update("
+ "collection2, "
+ "batchSize=5, "
+ "search("
+ "collection1, "
+ "q=*:*, "
+ "fl=\"id,a_s,a_i,a_f\", "
+ "sort=\"a_f asc, a_i asc\"))");
UpdateStream updateStream = new UpdateStream(expression, factory);
String expressionString = updateStream.toExpression(factory).toString();
assertTrue(expressionString.contains("update(collection2"));
assertTrue(expressionString.contains("batchSize=5"));
assertTrue(expressionString.contains("search(collection1"));
}
@Test
public void testFacetStream() throws Exception {