diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 4af565ae6ad..f2446f3128d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -2533,16 +2533,16 @@ public class StreamExpressionTest extends SolrCloudTestCase { public void testParallelTopicStream() throws Exception { new UpdateRequest() - .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1") - .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2") - .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3") - .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4") - .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5") - .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") - .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") - .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") - .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") - .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10") + .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1", "subject", "ha ha bla blah0") + .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2", "subject", "ha ha bla blah2") + .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "subject", "ha ha bla blah3") + .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "subject", "ha ha bla blah4") + .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5", "subject", "ha ha bla blah5") + .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6","subject", "ha ha bla blah6") + .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7","subject", "ha ha bla blah7") + .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8", "subject", "ha ha bla blah8") + .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9", "subject", "ha ha bla blah9") + .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10", "subject", "ha ha bla blah10") .commit(cluster.getSolrClient(), COLLECTION); StreamFactory factory = new StreamFactory() @@ -2653,6 +2653,37 @@ public class StreamExpressionTest extends SolrCloudTestCase { context.setSolrClientCache(cache); stream.setStreamContext(context); assertTopicRun(stream, "12","13"); + + //Test text extraction + + expression = StreamExpressionParser.parse("parallel(collection1, " + + "workers=\"2\", " + + "sort=\"_version_ asc\"," + + "topic(collection1, " + + "collection1, " + + "q=\"subject:bla\", " + + "fl=\"subject\", " + + "id=\"3000000\", " + + "initialCheckpoint=\"0\", " + + "partitionKeys=\"id\"))"); + + stream = factory.constructStream(expression); + context = new StreamContext(); + context.setSolrClientCache(cache); + stream.setStreamContext(context); + + assertTopicSubject(stream, "ha ha bla blah0", + "ha ha bla blah1", + "ha ha bla blah2", + "ha ha bla blah3", + "ha ha bla blah4", + "ha ha bla blah5", + "ha ha bla blah6", + "ha ha bla blah7", + "ha ha bla blah8", + "ha ha bla blah9", + "ha ha bla blah10"); + } finally { cache.close(); } @@ -3314,4 +3345,32 @@ public class StreamExpressionTest extends SolrCloudTestCase { throw new Exception("Wrong count in topic run:"+count); } } + + private void assertTopicSubject(TupleStream stream, String... textArray) throws Exception { + long version = -1; + int count = 0; + List texts = new ArrayList(); + for(String text : textArray) { + texts.add(text); + } + + try { + stream.open(); + while (true) { + Tuple tuple = stream.read(); + if (tuple.EOF) { + break; + } else { + ++count; + String subject = tuple.getString("subject"); + if (!texts.contains(subject)) { + throw new Exception("Expecting subject in topic run not found:" + subject); + } + } + } + } finally { + stream.close(); + } + } + }