diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 98a7f8f9463..d301024aa46 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -69,7 +69,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { StreamContext context = new StreamContext(); context.workerID = worker; context.numWorkers = numWorkers; - context.clientCache = clientCache; + context.setSolrClientCache(clientCache); tupleStream.setStreamContext(context); rsp.add("tuples", tupleStream); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscBucketComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscBucketComp.java deleted file mode 100644 index 5428bc3eb5b..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscBucketComp.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.solr.client.solrj.io; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Serializable; -import java.util.Comparator; - -public class AscBucketComp implements Comparator, Serializable { - - private int ord; - - public AscBucketComp(int ord) { - this.ord = ord; - } - - public int compare(BucketMetrics b1, BucketMetrics b2) { - double d1 = b1.getMetrics()[ord].getValue(); - double d2 = b2.getMetrics()[ord].getValue(); - if(d1 > d2) { - return 1; - } else if(d1 < d2) { - return -1; - } else { - return 0; - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java index 802d7e6844e..6fdfdf8dd7c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java @@ -20,6 +20,12 @@ package org.apache.solr.client.solrj.io; import java.io.Serializable; import java.util.Comparator; + +/** + * An ascending field Comparator which compares a field of two Tuples and determines sort order. + **/ + + public class AscFieldComp implements Comparator, Serializable { private static final long serialVersionUID = 1; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscMetricComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscMetricComp.java deleted file mode 100644 index c6d5a1afc11..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscMetricComp.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.solr.client.solrj.io; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Serializable; -import java.util.Comparator; -import java.util.List; - -public class AscMetricComp implements Comparator, Serializable { - - private static final long serialVersionUID = 1; - - private int ord; - - public AscMetricComp(int ord) { - this.ord = ord; - } - - public int compare(Tuple t1, Tuple t2) { - List values1 = (List)t1.get("metricValues"); - List values2 = (List)t2.get("metricValues"); - return values1.get(ord).compareTo(values2.get(ord)); - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Bucket.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Bucket.java deleted file mode 100644 index b6cd02f8391..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Bucket.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -/* -* -*/ - -import java.io.Serializable; - -public class Bucket implements Serializable { - - private static final long serialVersionUID = 1; - - private String bucketKey; - - public Bucket() { - - } - - public Bucket(String bucketKey) { - this.bucketKey = bucketKey; - } - - public String getBucketValue(Tuple tuple) { - return tuple.get(bucketKey).toString(); - } -} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/BucketMetrics.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/BucketMetrics.java deleted file mode 100644 index f0818b23d80..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/BucketMetrics.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.solr.client.solrj.io; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Serializable; - -public class BucketMetrics implements Serializable { - - private static final long serialVersionUID = 1; - - private HashKey key; - private Metric[] metrics; - - public BucketMetrics(HashKey key, Metric[] metrics) { - this.key = key; - this.metrics = metrics; - } - - public Metric[] getMetrics() { - return metrics; - } - - public HashKey getKey() { - return key; - } - - -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java index 62bb394894a..ab992342395 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java @@ -27,28 +27,24 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.util.SolrjNamedThreadFactory; -import org.apache.zookeeper.KeeperException; /** * Connects to Zookeeper to pick replicas from a specific collection to send the query to. -* SolrStream instances are used to send the query to the replicas. -* SolrStreams are opened using a Thread pool, but a single thread is used to iterate through each stream's tuples.* * +* Under the covers SolrStream instances are used to send the query to the replicas. +* SolrStreams are opened using a Thread pool, but a single thread is used to iterate through each stream's tuples. **/ public class CloudSolrStream extends TupleStream { @@ -59,24 +55,25 @@ public class CloudSolrStream extends TupleStream { protected String collection; protected Map params; private Map fieldMappings; - protected TreeSet tuples; protected Comparator comp; - protected List solrStreams = new ArrayList(); private int zkConnectTimeout = 10000; private int zkClientTimeout = 10000; - protected transient SolrClientCache cache; - protected transient CloudSolrClient cloudSolrClient; private int numWorkers; private int workerID; - protected Map eofTuples = new HashMap(); + private boolean trace; + protected transient Map eofTuples; + protected transient SolrClientCache cache; + protected transient CloudSolrClient cloudSolrClient; + protected transient List solrStreams; + protected transient TreeSet tuples; + protected transient StreamContext streamContext; - public CloudSolrStream(String zkHost, String collection, Map params) { + public CloudSolrStream(String zkHost, String collection, Map params) throws IOException { this.zkHost = zkHost; this.collection = collection; this.params = params; - this.tuples = new TreeSet(); String sort = (String)params.get("sort"); - this.comp = parseComp(sort); + this.comp = parseComp(sort, params); } //Used by the ParallelStream @@ -92,13 +89,21 @@ public class CloudSolrStream extends TupleStream { this.fieldMappings = fieldMappings; } + public void setTrace(boolean trace) { + this.trace = trace; + } + public void setStreamContext(StreamContext context) { this.numWorkers = context.numWorkers; this.workerID = context.workerID; - this.cache = context.clientCache; + this.cache = context.getSolrClientCache(); + this.streamContext = context; } public void open() throws IOException { + this.tuples = new TreeSet(); + this.solrStreams = new ArrayList(); + this.eofTuples = new HashMap(); if(this.cache != null) { this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost); } else { @@ -110,17 +115,33 @@ public class CloudSolrStream extends TupleStream { } + public Map getEofTuples() { + return this.eofTuples; + } public List children() { return solrStreams; } - private Comparator parseComp(String sort) { + private Comparator parseComp(String sort, Map params) throws IOException { + + String fl = (String)params.get("fl"); + String[] fls = fl.split(","); + HashSet fieldSet = new HashSet(); + for(String f : fls) { + fieldSet.add(f.trim()); //Handle spaces in the field list. + } + String[] sorts = sort.split(","); Comparator[] comps = new Comparator[sorts.length]; for(int i=0; i> futures = new ArrayList(); - for(TupleStream solrStream : solrStreams) { - StreamOpener so = new StreamOpener((SolrStream)solrStream, comp); - Future future = service.submit(so); - futures.add(future); - } - try { - for(Future f : futures) { - TupleWrapper w = f.get(); - if(w != null) { - tuples.add(w); - } + List> futures = new ArrayList(); + for (TupleStream solrStream : solrStreams) { + StreamOpener so = new StreamOpener((SolrStream) solrStream, comp); + Future future = service.submit(so); + futures.add(future); } - } catch (Exception e) { - throw new IOException(e); - } - service.shutdown(); + try { + for (Future f : futures) { + TupleWrapper w = f.get(); + if (w != null) { + tuples.add(w); + } + } + } catch (Exception e) { + throw new IOException(e); + } + } finally { + service.shutdown(); + } } public void close() throws IOException { @@ -212,12 +234,21 @@ public class CloudSolrStream extends TupleStream { TupleWrapper tw = tuples.pollFirst(); if(tw != null) { Tuple t = tw.getTuple(); + + if (trace) { + t.put("_COLLECTION_", this.collection); + } + if(tw.next()) { tuples.add(tw); } return t; } else { Map m = new HashMap(); + if(trace) { + m.put("_COLLECTION_", this.collection); + } + m.put("EOF", true); return new Tuple(m); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/CountMetric.java deleted file mode 100644 index 5ab4aaa2128..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/CountMetric.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; -import java.util.Map; -import java.util.HashMap; - -public class CountMetric implements Metric, Serializable { - - private static final long serialVersionUID = 1; - - public static final String COUNT = "count"; - private long count; - - public String getName() { - return "count"; - } - - public void update(Tuple tuple) { - ++count; - } - - public double getValue() { - return count; - } - - public Metric newInstance() { - return new CountMetric(); - } - - public Map metricValues() { - Map m = new HashMap(); - double d = (double)count; - m.put(COUNT, d); - return m; - } - - public void update(Map metricValues) { - double dcount = metricValues.get(COUNT); - count+=(long)dcount; - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescBucketComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescBucketComp.java deleted file mode 100644 index cf5dbfcccc9..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescBucketComp.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.solr.client.solrj.io; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.Serializable; -import java.util.Comparator; - -public class DescBucketComp implements Comparator, Serializable { - - private int ord; - - public DescBucketComp(int ord) { - this.ord = ord; - } - - public int compare(BucketMetrics b1, BucketMetrics b2) { - double d1 = b1.getMetrics()[ord].getValue(); - double d2 = b2.getMetrics()[ord].getValue(); - if(d1 > d2) { - return -1; - } else if(d1 < d2) { - return 1; - } else { - return 0; - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java index 4c69181e745..38d7f42fb34 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java @@ -20,6 +20,11 @@ package org.apache.solr.client.solrj.io; import java.io.Serializable; import java.util.Comparator; +/** + * An descending field Comparator which compares a field of two Tuples and determines sort order. + **/ + + public class DescFieldComp implements Comparator, Serializable { private static final long serialVersionUID = 1; @@ -33,11 +38,11 @@ public class DescFieldComp implements Comparator, Serializable { public int compare(Tuple t1, Tuple t2) { Comparable o1 = (Comparable)t1.get(field); Comparable o2 = (Comparable)t2.get(field); - int i = o1.compareTo(o2); - if(i == 0) { - return i; + int c = o1.compareTo(o2); + if(c == 0) { + return 0; } else { - return -i; + return -c; } } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/FilterStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/FilterStream.java deleted file mode 100644 index ef26a0ccde4..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/FilterStream.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.ArrayList; - -public class FilterStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private TupleStream streamA; - private TupleStream streamB; - private Comparator comp; - private Tuple a = null; - private Tuple b = null; - - /* - * Intersects streamA by streamB based on a Comparator. - * Both streams must be sorted by the fields being compared. - * StreamB must be unique for the fields being compared. - **/ - - public FilterStream(TupleStream streamA, TupleStream streamB, Comparator comp) { - this.streamA = streamA; - this.streamB = streamB; - this.comp = comp; - } - - public void setStreamContext(StreamContext context) { - this.streamA.setStreamContext(context); - this.streamB.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(streamA); - l.add(streamB); - return l; - } - - public void open() throws IOException { - streamA.open(); - streamB.open(); - } - - public void close() throws IOException { - streamA.close(); - streamB.close(); - } - - public Tuple read() throws IOException { - a = streamA.read(); - - if(b == null) { - b = streamB.read(); - } - - while(true) { - if(a.EOF) { - return a; - } - - if(b.EOF) { - return b; - } - - int i = comp.compare(a, b); - if(i == 0) { - return a; - } else if(i < 0) { - // a < b so advance a - a = streamA.read(); - } else { - // a > b so advance b - b = streamB.read(); - } - } - } - - public int getCost() { - return 0; - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/GroupByStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/GroupByStream.java deleted file mode 100644 index 1b84dff4990..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/GroupByStream.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.PriorityQueue; - -/** - * Iterates over a TupleStream Groups The TopN Tuples of a group. - **/ - -public class GroupByStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private TupleStream tupleStream; - private Comparator interGroupComp; - private Comparator intraGroupComp; - private Comparator reverseComp; - private Tuple currentTuple; - private int size; - - public GroupByStream(TupleStream tupleStream, - Comparator interGroupComp, - Comparator intraGroupComp, - int size) { - this.tupleStream = tupleStream; - this.interGroupComp = interGroupComp; - this.intraGroupComp = intraGroupComp; - this.reverseComp = new ReverseComp(intraGroupComp); - this.size = size; - } - - public void setStreamContext(StreamContext context) { - this.tupleStream.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(tupleStream); - return l; - } - - public void open() throws IOException { - tupleStream.open(); - currentTuple = tupleStream.read(); //Read the first Tuple so currentTuple is never null; - } - - public void close() throws IOException { - tupleStream.close(); - } - - public Tuple read() throws IOException { - - if(currentTuple.EOF) { - return currentTuple; - } - - PriorityQueue group = new PriorityQueue<>(size, reverseComp); - group.add(currentTuple); - while(true) { - Tuple t = tupleStream.read(); - - if(t.EOF) { - currentTuple = t; - break; - } - - if(interGroupComp.compare(currentTuple, t) == 0) { - if(group.size() >= size) { - Tuple peek = group.peek(); - if(intraGroupComp.compare(t, peek) < 0) { - group.poll(); - group.add(t); - } - } else { - group.add(t); - } - } else { - currentTuple = t; - break; - } - } - - //We have a finished group so add the Tuples to an array. - Tuple[] members = new Tuple[group.size()]; - for(int i=group.size()-1; i>=0; i--) { - Tuple t = group.poll(); - members[i] = t; - } - - //First Tuple is the group head. - Tuple groupHead = members[0]; - if(members.length > 1) { - List groupList = new ArrayList(); - for(int i=1; i, Serializable { - - private Comparator comp; - - public ReverseComp(Comparator comp) { - this.comp = comp; - } - - public int compare(Tuple t1, Tuple t2) { - return comp.compare(t1, t2)*(-1); - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashJoinStream.java deleted file mode 100644 index f89b164ea18..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashJoinStream.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; - -/** - * - * - **/ - -public class HashJoinStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private PushBackStream streamA; - private TupleStream streamB; - private String[] keys; - private HashMap> hashMap = new HashMap(); - - public HashJoinStream(TupleStream streamA, TupleStream streamB, String[] keys) { - this.streamA = new PushBackStream(streamA); - this.streamB = streamB; - this.keys = keys; - } - - public void setStreamContext(StreamContext context) { - this.streamA.setStreamContext(context); - this.streamB.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(streamA); - l.add(streamB); - return l; - } - - public void open() throws IOException { - streamB.open(); - while(true) { - Tuple t = streamB.read(); - if(t.EOF) { - break; - } - - HashKey hashKey = new HashKey(t, keys); - if(hashMap.containsKey(hashKey)) { - List tuples = hashMap.get(hashKey); - tuples.add(t); - } else { - List tuples = new ArrayList(); - tuples.add(t); - hashMap.put(hashKey, tuples); - } - } - - streamB.close(); - streamA.open(); - } - - public void close() throws IOException { - streamA.close(); - } - - private LinkedList joinTuples = new LinkedList(); - - public Tuple read() throws IOException { - while(true) { - Tuple tuple = streamA.read(); - - if(tuple.EOF) { - return tuple; - } - - if(joinTuples.size() > 0) { - Tuple t = tuple.clone(); - Tuple j = joinTuples.removeFirst(); - t.fields.putAll(j.fields); - if(joinTuples.size() > 0) { - streamA.pushBack(tuple); - } - - return t; - } else { - HashKey hashKey = new HashKey(tuple, keys); - - if(hashMap.containsKey(hashKey)) { - List joinWith = hashMap.get(hashKey); - for(Tuple jt : joinWith) { - joinTuples.add(jt); - } - streamA.pushBack(tuple); - } - } - } - } - - public int getCost() { - return 0; - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashKey.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashKey.java deleted file mode 100644 index e475edbda9b..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/HashKey.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; - -public class HashKey implements Serializable { - - private static final long serialVersionUID = 1; - - private Object[] parts; - - - public HashKey(String value) { - parts = (Object[])value.split("::"); - } - - public HashKey(Tuple t, String[] keys) { - this.parts = new Object[keys.length]; - for(int i=0; i 0) { - buf.append("::"); - } - buf.append(parts[i].toString()); - } - - return buf.toString(); - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MaxMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MaxMetric.java deleted file mode 100644 index 800b14a9fc1..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MaxMetric.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; -import java.util.Map; -import java.util.HashMap; - -public class MaxMetric implements Metric, Serializable { - - public static final String MAX = "max"; - private long longMax = -Long.MAX_VALUE; - private double doubleMax = Double.MAX_VALUE; - private boolean isDouble; - private String column; - - public MaxMetric(String column, boolean isDouble) { - this.column = column; - this.isDouble = isDouble; - } - - public String getName() { - return "mix:"+column; - } - - public double getValue() { - if(isDouble) { - return doubleMax; - } else { - return longMax; - } - } - - public void update(Tuple tuple) { - if(isDouble) { - double d = (double)tuple.get(column); - if(d > doubleMax) { - doubleMax = d; - } - } else { - long l = (long)tuple.get(column); - if(l > longMax) { - longMax = l; - } - } - } - - public Metric newInstance() { - return new MaxMetric(column, isDouble); - } - - public Map metricValues() { - Map m = new HashMap(); - if(isDouble) { - m.put(MAX,doubleMax); - } else { - doubleMax = (double)longMax; - m.put(MAX,doubleMax); - } - return m; - } - - public void update(Map metricValues) { - if(isDouble) { - double dmax = metricValues.get(MAX); - if(dmax > doubleMax) { - doubleMax = dmax; - } - } else { - double dmax = metricValues.get(MAX); - long lmax = (long) dmax; - if(lmax > longMax) { - longMax = lmax; - } - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MeanMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MeanMetric.java deleted file mode 100644 index b3372ac103a..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MeanMetric.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; -import java.util.Map; -import java.util.HashMap; - -public class MeanMetric implements Metric, Serializable { - - private static final long serialVersionUID = 1; - - public static final String SUM = "sum"; - public static final String COUNT = "count"; - public static final String MEAN = "mean"; - - private String column; - private boolean isDouble; - private double doubleSum; - private long longSum; - private long count; - - public MeanMetric(String column, boolean isDouble) { - this.column = column; - this.isDouble = isDouble; - } - - public String getName() { - return "mean:"+column; - } - - public void update(Tuple tuple) { - ++count; - if(isDouble) { - Double d = (Double)tuple.get(column); - doubleSum += d.doubleValue(); - } else { - Long l = (Long)tuple.get(column); - longSum += l.doubleValue(); - } - } - - public Metric newInstance() { - return new MeanMetric(column, isDouble); - } - - public double getValue() { - double dcount = (double)count; - if(isDouble) { - double ave = doubleSum/dcount; - return ave; - - } else { - double ave = longSum/dcount; - return ave; - } - } - - public Map metricValues() { - Map m = new HashMap(); - double dcount = (double)count; - m.put(COUNT, dcount); - if(isDouble) { - double ave = doubleSum/dcount; - m.put(MEAN,ave); - m.put(SUM,doubleSum); - - } else { - double ave = longSum/dcount; - doubleSum = (double)longSum; - m.put(MEAN,ave); - m.put(SUM,doubleSum); - } - - return m; - } - - public void update(Map metricValues) { - double dcount = metricValues.get(COUNT); - count += (long)dcount; - if(isDouble) { - double dsum = metricValues.get(SUM); - doubleSum+=dsum; - } else { - double dsum = metricValues.get(SUM); - longSum+=(long)dsum; - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeJoinStream.java deleted file mode 100644 index 5fb07f86c54..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeJoinStream.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Comparator; - - -/** - * Merge Joins streamA with streamB based on the Comparator. - * Supports: - * one-to-one, one-to-many, many-to-one and many-to-many joins - * - **/ - -public class MergeJoinStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private PushBackStream streamA; - private PushBackStream streamB; - private Comparator comp; - - public MergeJoinStream(TupleStream streamA, TupleStream streamB, Comparator comp) { - this.streamA = new PushBackStream(streamA); - this.streamB = new PushBackStream(streamB); - this.comp = comp; - } - - public void setStreamContext(StreamContext context) { - this.streamA.setStreamContext(context); - this.streamB.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(streamA); - l.add(streamB); - return l; - } - - public void open() throws IOException { - streamA.open(); - streamB.open(); - } - - public void close() throws IOException { - streamA.close(); - streamB.close(); - } - - private LinkedList joinTuples = new LinkedList(); - private List listA = new ArrayList(); - private List listB = new ArrayList(); - - public Tuple read() throws IOException { - - if(joinTuples.size() > 0) { - return joinTuples.removeFirst(); - } - - OUTER: - while(true) { - if(listA.size() == 0) { - //Stream A needs to be advanced. - Tuple a = streamA.read(); - if(a.EOF) { - return a; - } - - listA.add(a); - INNERA: - while(true) { - Tuple a1 = streamA.read(); - if(a1.EOF) { - streamA.pushBack(a1); - break INNERA; - } - - if(comp.compare(a,a1) == 0) { - listA.add(a1); - } else { - streamA.pushBack(a1); - break INNERA; - } - } - } - - if(listB.size() == 0) { - //StreamB needs to be advanced. - Tuple b = streamB.read(); - if(b.EOF) { - return b; - } - - listB.add(b); - INNERA: - while(true) { - Tuple b1 = streamB.read(); - - if(b1.EOF) { - streamB.pushBack(b1); - break INNERA; - } - - if(comp.compare(b,b1) == 0) { - listB.add(b1); - } else { - streamB.pushBack(b1); - break INNERA; - } - } - } - - int c = comp.compare(listA.get(0),listB.get(0)); - if(c == 0) { - //The Tuple lists match. So build all the Tuple combinations. - for(Tuple aa : listA) { - for(Tuple bb : listB) { - Tuple clone = aa.clone(); - clone.fields.putAll(bb.fields); - joinTuples.add(clone); - } - } - - //This will advance both streams. - listA.clear(); - listB.clear(); - - return joinTuples.removeFirst(); - } else if(c < 0) { - //This will advance streamA - listA.clear(); - } else { - //This will advance streamB - listB.clear(); - } - } - } - - public int getCost() { - return 0; - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java index 604c63c36d0..9675ce6d9ce 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.ArrayList; /** -* Unions streamA with streamB based on a Comparator. +* Unions streamA with streamB ordering the Tuples based on a Comparator. * Both streams must be sorted by the fields being compared. **/ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Metric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Metric.java deleted file mode 100644 index cce08090a13..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Metric.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; -import java.util.Map; - -public interface Metric extends Serializable { - public String getName(); - public double getValue(); - public void update(Tuple tuple); - public Metric newInstance(); - public Map metricValues(); - public void update(Map metricValues); -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MetricStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MetricStream.java deleted file mode 100644 index e98c7d3d6d9..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MetricStream.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.ArrayList; -import java.util.PriorityQueue; - -/* -String[] buckets = {"a","b"}; -Metric -BucketStream bucketStream = new BucketStream(stream,buckets,metrics,"my-metrics","name"); - -bucketStream.get( -*/ - -public class MetricStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private TupleStream tupleStream; - private Bucket[] buckets; - private Metric[] metrics; - private String outKey; - private Map bucketMap; - private BucketMetrics[] bucketMetrics; - private static final HashKey metricsKey = new HashKey("metrics"); - private int topN; - private Comparator comp; - private Comparator rcomp; - - public MetricStream(TupleStream tupleStream, - Bucket[] buckets, - Metric[] metrics, - String outKey, - Comparator comp, - int topN) { - this.tupleStream = tupleStream; - this.buckets = buckets; - this.metrics = metrics; - this.outKey = outKey; - this.topN = topN; - this.rcomp = new ReverseOrdComp(comp); - this.comp = comp; - this.bucketMap = new HashMap(); - } - - public MetricStream(TupleStream tupleStream, - Metric[] metrics, - String outKey) { - this.tupleStream = tupleStream; - this.metrics = metrics; - this.outKey = outKey; - this.bucketMap = new HashMap(); - } - - public String getOutKey() { - return this.outKey; - } - - public BucketMetrics[] getBucketMetrics(){ - return bucketMetrics; - } - - public void setBucketMetrics(BucketMetrics[] bucketMetrics) { - this.bucketMetrics = bucketMetrics; - } - - BucketMetrics[] merge(List all) { - Map bucketAccumulator = new HashMap(); - - for(Map top : all) { - List ks = (List)top.get("buckets"); - List>> ms = (List>>)top.get("metrics"); - for(int i=0; i> bucketMs = ms.get(i); - - HashKey hashKey = new HashKey(key); - if(bucketAccumulator.containsKey(hashKey)) { - Metric[] mergeMetrics = bucketAccumulator.get(hashKey); - for(int m=0; m> it = bucketAccumulator.entrySet().iterator(); - - PriorityQueue priorityQueue = new PriorityQueue(topN, rcomp); - - while(it.hasNext()) { - Map.Entry entry = it.next(); - BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue()); - if(priorityQueue.size() < topN) { - priorityQueue.add(bms); - } else { - BucketMetrics peek = priorityQueue.peek(); - if(comp.compare(bms, peek) < 0) { - priorityQueue.poll(); - priorityQueue.add(bms); - } - } - } - - int s = priorityQueue.size(); - BucketMetrics[] bucketMetrics = new BucketMetrics[s]; - - for(int i=bucketMetrics.length-1; i>=0; i--) { - BucketMetrics b = priorityQueue.poll(); - bucketMetrics[i]= b; - } - return bucketMetrics; - } - - private class ReverseOrdComp implements Comparator, Serializable { - private Comparator comp; - - public ReverseOrdComp(Comparator comp) { - this.comp = comp; - } - - public int compare(BucketMetrics e1, BucketMetrics e2) { - return comp.compare(e1,e2)*-1; - } - } - - public void setStreamContext(StreamContext context) { - this.tupleStream.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(tupleStream); - return l; - } - - public void open() throws IOException { - tupleStream.open(); - } - - public void close() throws IOException { - tupleStream.close(); - } - - public Tuple read() throws IOException { - - Tuple tuple = tupleStream.read(); - if(tuple.EOF) { - Iterator> it = bucketMap.entrySet().iterator(); - - if(comp == null) { - //Handle No bucket constructor - Map.Entry noBucket = it.next(); - BucketMetrics bms = new BucketMetrics(noBucket.getKey(), noBucket.getValue()); - this.bucketMetrics = new BucketMetrics[1]; - this.bucketMetrics[0] = bms; - List> outMetrics = new ArrayList(); - List outKeys = new ArrayList(); - for(Metric metric : bms.getMetrics()) { - Map outMetricValues = metric.metricValues(); - String outKey = metric.getName(); - outMetrics.add(outMetricValues); - outKeys.add(outKey); - } - Map outMap = new HashMap(); - outMap.put("buckets",outKeys); - outMap.put("metrics",outMetrics); - tuple.set(this.outKey, outMap); - return tuple; - } - - PriorityQueue priorityQueue = new PriorityQueue(topN, rcomp); - - while(it.hasNext()) { - Map.Entry entry = it.next(); - BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue()); - if(priorityQueue.size() < topN) { - priorityQueue.add(bms); - } else { - BucketMetrics peek = priorityQueue.peek(); - - if(comp.compare(bms, peek) < 0) { - priorityQueue.poll(); - priorityQueue.add(bms); - } - - } - } - - int s = priorityQueue.size(); - this.bucketMetrics = new BucketMetrics[s]; - - for(int i=bucketMetrics.length-1; i>=0; i--) { - BucketMetrics b = priorityQueue.poll(); - this.bucketMetrics[i]= b; - } - - List>> outMetrics = new ArrayList(); - List outBuckets = new ArrayList(); - - for(BucketMetrics bms : this.bucketMetrics) { - List outBucketMetrics = new ArrayList(); - for(Metric metric : bms.getMetrics()) { - Map outMetricValues = metric.metricValues(); - outBucketMetrics.add(outMetricValues); - } - outBuckets.add(bms.getKey().toString()); - outMetrics.add(outBucketMetrics); - } - - Map outMap = new HashMap(); - outMap.put("buckets",outBuckets); - outMap.put("metrics",outMetrics); - tuple.set(this.outKey, outMap); - return tuple; - } - - HashKey hashKey = null; - if(buckets != null) { - String[] bucketValues = new String[buckets.length]; - for(int i=0; i metricValues() { - Map m = new HashMap(); - if(isDouble) { - m.put(MIN,doubleMin); - } else { - doubleMin = (double)longMin; - m.put(MIN,doubleMin); - } - return m; - } - - public void update(Map metricValues) { - if(isDouble) { - double dmin = metricValues.get(MIN); - if(dmin < doubleMin) { - doubleMin = dmin; - } - } else { - double dmin = metricValues.get(MIN); - long lmin = (long) dmin; - if(lmin < longMin) { - longMin = lmin; - } - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java index 45808da9077..28c9529241c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java @@ -20,6 +20,11 @@ package org.apache.solr.client.solrj.io; import java.io.Serializable; import java.util.Comparator; + +/** + * Wraps multiple Comparators to provide sub sorting. + **/ + public class MultiComp implements Comparator, Serializable { private static final long serialVersionUID = 1; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java index a2bffb7b99a..9c2111428a7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java @@ -52,7 +52,6 @@ public class ParallelStream extends CloudSolrStream { private int workers; private String encoded; - public ParallelStream(String zkHost, String collection, TupleStream tupleStream, @@ -78,30 +77,10 @@ public class ParallelStream extends CloudSolrStream { return l; } - public void merge(List metricStreams) { - for(MetricStream metricStream : metricStreams) { - String outKey = metricStream.getOutKey(); - Iterator it = eofTuples.values().iterator(); - List values = new ArrayList(); - - while(it.hasNext()) { - Tuple t = it.next(); - Map top = (Map)t.get(outKey); - values.add(top); - } - - BucketMetrics[] bucketMetrics = metricStream.merge(values); - metricStream.setBucketMetrics(bucketMetrics); - } - } - public Tuple read() throws IOException { Tuple tuple = _read(); if(tuple.EOF) { - List metricStreams = new ArrayList(); - getMetricStreams(this, metricStreams); - this.merge(metricStreams); Map m = new HashMap(); m.put("EOF", true); return new Tuple(m); @@ -110,16 +89,10 @@ public class ParallelStream extends CloudSolrStream { return tuple; } - private void getMetricStreams(TupleStream tupleStream, - List metricStreams) { - if(tupleStream instanceof MetricStream) { - metricStreams.add((MetricStream)tupleStream); - } - - List children = tupleStream.children(); - for(TupleStream ts : children) { - getMetricStreams(ts, metricStreams); - } + public void setStreamContext(StreamContext streamContext) { + //Note the parallel stream does not set the StreamContext on it's substream. + //This is because the substream is not actually opened by the ParallelStream. + this.streamContext = streamContext; } protected void constructStreams() throws IOException { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java index cb15eac01c5..5687406df61 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java @@ -29,10 +29,11 @@ public class PushBackStream extends TupleStream { private TupleStream stream; private Tuple tuple; - /* - * A TupleStream that allows a single Tuple to be pushed back into Stream after it's been read. - * - **/ + /** + * A TupleStream that allows a single Tuple to be pushed back into Stream after it's been read. + * This is a useful class when building streams that maintain the order of the tuples from two or + * more substreams. + **/ public PushBackStream(TupleStream stream) { this.stream = stream; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java new file mode 100644 index 00000000000..f22e66de3f8 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Map; +import java.util.HashMap; + +/** + * Iterates over a TupleStream and buffers Tuples that are equal based on a field comparator. + * This allows tuples to be grouped by a common field. + * + * The read() method emits one tuple per group. The top levels fields reflect the first tuple + * encountered in the group. + * + * Use the Tuple.getMaps() method to return the all the Tuples in the group. The method returns + * a list of maps (including the group head), which hold the data for each Tuple in the group. + * + * Note: This ReducerStream requires that the underlying streams be sorted and partitioned by same + * fields as it's comparator. + * + **/ + +public class ReducerStream extends TupleStream { + + private static final long serialVersionUID = 1; + + private PushBackStream tupleStream; + private Comparator comp; + + private Tuple currentGroupHead; + + public ReducerStream(TupleStream tupleStream, + Comparator comp) { + this.tupleStream = new PushBackStream(tupleStream); + this.comp = comp; + } + + public void setStreamContext(StreamContext context) { + this.tupleStream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(tupleStream); + return l; + } + + public void open() throws IOException { + tupleStream.open(); + } + + public void close() throws IOException { + tupleStream.close(); + } + + public Tuple read() throws IOException { + + List maps = new ArrayList(); + while(true) { + Tuple t = tupleStream.read(); + + if(t.EOF) { + if(maps.size() > 0) { + tupleStream.pushBack(t); + Map map1 = maps.get(0); + Map map2 = new HashMap(); + map2.putAll(map1); + Tuple groupHead = new Tuple(map2); + groupHead.setMaps(maps); + return groupHead; + } else { + return t; + } + } + + if(currentGroupHead == null) { + currentGroupHead = t; + maps.add(t.getMap()); + } else { + if(comp.compare(currentGroupHead, t) == 0) { + maps.add(t.getMap()); + } else { + Tuple groupHead = currentGroupHead.clone(); + tupleStream.pushBack(t); + currentGroupHead = null; + groupHead.setMaps(maps); + return groupHead; + } + } + } + } + + public int getCost() { + return 0; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/RollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/RollupStream.java deleted file mode 100644 index 7afd4aa77c3..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/RollupStream.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.solr.client.solrj.io; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ArrayList; - -public class RollupStream extends TupleStream { - - private static final long serialVersionUID = 1; - - private PushBackStream tupleStream; - private Bucket[] buckets; - private Metric[] metrics; - private HashKey currentKey = new HashKey("-"); - private Metric[] currentMetrics; - private boolean finished = false; - - public RollupStream(TupleStream tupleStream, - Bucket[] buckets, - Metric[] metrics) { - this.tupleStream = new PushBackStream(tupleStream); - this.buckets = buckets; - this.metrics = metrics; - } - - public void setStreamContext(StreamContext context) { - this.tupleStream.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(tupleStream); - return l; - } - - public void open() throws IOException { - tupleStream.open(); - } - - public void close() throws IOException { - tupleStream.close(); - } - - public Tuple read() throws IOException { - - while(true) { - Tuple tuple = tupleStream.read(); - if(tuple.EOF) { - if(!finished) { - Map map = new HashMap(); - if(currentMetrics != null) { - List metricValues = new ArrayList(); - List metricNames = new ArrayList(); - for (Metric metric : currentMetrics) { - metricNames.add(metric.getName()); - metricValues.add(metric.getValue()); - } - map.put("buckets", currentKey.toString()); - map.put("metricNames", metricNames); - map.put("metricValues", metricValues); - Tuple t = new Tuple(map); - tupleStream.pushBack(tuple); - finished = true; - return t; - } else { - return tuple; - } - } else { - return tuple; - } - } - - String[] bucketValues = new String[buckets.length]; - for(int i=0; i metricValues = new ArrayList(); - List metricNames = new ArrayList(); - for(Metric metric : currentMetrics) { - metricNames.add(metric.getName()); - metricValues.add(metric.getValue()); - } - map.put("buckets", currentKey.toString()); - map.put("metricNames", metricNames); - map.put("metricValues", metricValues); - t = new Tuple(map); - } - - currentMetrics = new Metric[metrics.length]; - currentKey = hashKey; - for(int i=0; i fieldMappings; private transient JSONTupleStream jsonTupleStream; private transient HttpSolrClient client; @@ -65,7 +66,7 @@ public class SolrStream extends TupleStream { public void setStreamContext(StreamContext context) { this.numWorkers = context.numWorkers; this.workerID = context.workerID; - this.cache = context.clientCache; + this.cache = context.getSolrClientCache(); } /** @@ -87,11 +88,21 @@ public class SolrStream extends TupleStream { } } - private SolrParams loadParams(Map params) { + public void setTrace(boolean trace) { + this.trace = trace; + } + + private SolrParams loadParams(Map params) throws IOException { ModifiableSolrParams solrParams = new ModifiableSolrParams(); - if(this.numWorkers > 0) { - String partitionFilter = getPartitionFilter(); - solrParams.add("fq", partitionFilter); + if(params.containsKey("partitionKeys")) { + if(!params.get("partitionKeys").equals("none")) { + String partitionFilter = getPartitionFilter(); + solrParams.add("fq", partitionFilter); + } + } else { + if(numWorkers > 1) { + throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker."); + } } Iterator it = params.entrySet().iterator(); @@ -129,6 +140,11 @@ public class SolrStream extends TupleStream { public Tuple read() throws IOException { Map fields = jsonTupleStream.next(); + + if(trace) { + fields.put("_CORE_", this.baseUrl); + } + if(fields == null) { //Return the EOF tuple. Map m = new HashMap(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java index a138d5a2cc8..5e1fd6e7ee9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java @@ -17,24 +17,26 @@ package org.apache.solr.client.solrj.io; +import java.io.Serializable; import java.util.Map; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; -public class StreamContext { +/** + * The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method. + * The StreamContext is used pass shared context info from to concentrically wrapped TupleStreams. + * + * Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for reuse + * across multiple TupleStreams. + **/ + + +public class StreamContext implements Serializable{ private Map entries = new HashMap(); public int workerID; public int numWorkers; - public SolrClientCache clientCache; - - public SolrClientCache getClientCache() { - return this.clientCache; - } - - public void setSolrClientCache(SolrClientCache clientCache) { - this.clientCache = clientCache; - } + private SolrClientCache clientCache; public Object get(Object key) { return entries.get(key); @@ -43,4 +45,12 @@ public class StreamContext { public void put(Object key, Object value) { this.entries.put(key, value); } + + public void setSolrClientCache(SolrClientCache clientCache) { + this.clientCache = clientCache; + } + + public SolrClientCache getSolrClientCache() { + return this.clientCache; + } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SumMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SumMetric.java deleted file mode 100644 index af4e55ff5e3..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SumMetric.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io; - -import java.io.Serializable; -import java.util.Map; -import java.util.HashMap; - -public class SumMetric implements Metric, Serializable { - - private static final long serialVersionUID = 1; - - public static final String SUM = "sum"; - - private String column; - private boolean isDouble; - private double doubleSum; - private long longSum; - - public SumMetric(String column, boolean isDouble) { - this.column = column; - this.isDouble = isDouble; - } - - public String getName() { - return "sum:"+column; - } - - public void update(Tuple tuple) { - if(isDouble) { - Double d = (Double)tuple.get(column); - doubleSum += d.doubleValue(); - } else { - Long l = (Long)tuple.get(column); - longSum += l.doubleValue(); - } - } - - public Metric newInstance() { - return new SumMetric(column, isDouble); - } - - public Map metricValues() { - Map m = new HashMap(); - if(isDouble) { - m.put(SUM,doubleSum); - - } else { - doubleSum = (double)longSum; - m.put(SUM,doubleSum); - } - - return m; - } - - public double getValue() { - if(isDouble) { - return doubleSum; - } else { - return (double)longSum; - } - } - - public void update(Map metricValues) { - if(isDouble) { - double dsum = metricValues.get(SUM); - doubleSum+=dsum; - } else { - double dsum = metricValues.get(SUM); - longSum+=(long)dsum; - } - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index 77e9efdc52f..cbcea6464b6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -47,7 +47,7 @@ public class Tuple implements Cloneable { return this.fields.get(key); } - public void set(Object key, Object value) { + public void put(Object key, Object value) { this.fields.put(key, value); } @@ -63,19 +63,14 @@ public class Tuple implements Cloneable { return (Double)this.fields.get(key); } - - public List getStrings(Object key) { return (List)this.fields.get(key); } - - public List getLongs(Object key) { return (List)this.fields.get(key); } - public List getDoubles(Object key) { return (List)this.fields.get(key); } @@ -84,6 +79,20 @@ public class Tuple implements Cloneable { return fields.entrySet().iterator(); } + public Map getMap() { + return this.fields; + } + + public List getMaps() { + return (List)this.fields.get("_MAPS_"); + } + + public void setMaps(List maps) { + this.fields.put("_MAPS_", maps); + + } + + public Tuple clone() { HashMap m = new HashMap(); m.putAll(fields); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java index 883b46809a3..d28dad509ff 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java @@ -22,6 +22,13 @@ import java.util.Comparator; import java.util.List; import java.util.ArrayList; + +/** + * The UniqueStream emits a unique stream of Tuples based on a Comparator. + * + * Note: The sort order of the underlying stream must match the Comparator. + **/ + public class UniqueStream extends TupleStream { private static final long serialVersionUID = 1; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescMetricComp.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java similarity index 50% rename from solr/solrj/src/java/org/apache/solr/client/solrj/io/DescMetricComp.java rename to solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java index 2b514247553..95ecdc3fb90 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescMetricComp.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java @@ -1,5 +1,4 @@ -package org.apache.solr.client.solrj.io; - +package org.apache.solr.client.solrj.io; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,23 +16,46 @@ package org.apache.solr.client.solrj.io; * limitations under the License. */ +import java.io.IOException; import java.io.Serializable; -import java.util.Comparator; +import java.util.ArrayList; import java.util.List; -public class DescMetricComp implements Comparator, Serializable { +public class CountStream extends TupleStream implements Serializable { - private static final long serialVersionUID = 1; + private TupleStream stream; + private int count; - private int ord; - - public DescMetricComp(int ord) { - this.ord = ord; + public CountStream(TupleStream stream) { + this.stream = stream; } - public int compare(Tuple t1, Tuple t2) { - List values1 = (List)t1.get("metricValues"); - List values2 = (List)t2.get("metricValues"); - return values1.get(ord).compareTo(values2.get(ord))*-1; + public void close() throws IOException { + this.stream.close(); + } + + public void open() throws IOException { + this.stream.open(); + } + + public List children() { + List l = new ArrayList(); + l.add(stream); + return l; + } + + public void setStreamContext(StreamContext streamContext) { + stream.setStreamContext(streamContext); + } + + public Tuple read() throws IOException { + Tuple t = stream.read(); + if(t.EOF) { + t.put("count", count); + return t; + } else { + ++count; + return t; + } } } \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java index 71ea266a9ac..461e977a98d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java @@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -127,323 +128,85 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { } - private void testHashJoinStream() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000"); - indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0"); - - commit(); + private void testSpacesInParams() throws Exception { String zkHost = zkServer.getZkAddress(); - //Test one-to-one - Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f asc , a_i asc"); - Map fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); + //CloudSolrStream compares the values of the sort with the fl field. + //The constructor will throw an exception if the sort fields do not the + //a value in the field list. - Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - String[] keys = {"a_f"}; - - HashJoinStream fstream = new HashJoinStream(streamA, streamB, keys); - List tuples = getTuples(fstream); - - assert(tuples.size() == 1); - assertOrder(tuples, 0); - assertLong(tuples.get(0), "join_i", 1000); - - - //Test one-to-many - - paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new HashJoinStream(streamA, streamB, keys); - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-one - - paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_s desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new HashJoinStream(streamA, streamB, keys); - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 2,0); - assertLong(tuples.get(0), "join_i", 2000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-many - - paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_s desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_s desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new HashJoinStream(streamA, streamB, keys); - tuples = getTuples(fstream); - - assert(tuples.size() == 4); - assertOrder(tuples, 7,7,0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - assertLong(tuples.get(2), "join_i", 1000); - assertLong(tuples.get(3), "join_i", 2000); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); del("*:*"); commit(); } - private void testMergeJoinStream() throws Exception { + private void testNonePartitionKeys() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000"); + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000"); - indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); commit(); String zkHost = zkServer.getZkAddress(); - //Test one-to-one - Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "none"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new AscFieldComp("a_s")); - Map fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); + List tuples = getTuples(pstream); - Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - String[] keys = {"a_f"}; - - MergeJoinStream fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - List tuples = getTuples(fstream); - - assert(tuples.size() == 1); - assertOrder(tuples, 0); - assertLong(tuples.get(0), "join_i", 1000); - - - //Test one-to-many - - paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-one - - paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 2,0); - assertLong(tuples.get(0), "join_i", 2000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-many - - paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - fstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - tuples = getTuples(fstream); - - assert(tuples.size() == 4); - assertOrder(tuples, 7,7,0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - assertLong(tuples.get(2), "join_i", 1000); - assertLong(tuples.get(3), "join_i", 2000); + assert(tuples.size() == 20); // Each tuple will be double counted. del("*:*"); commit(); } - private void testParallelMergeJoinStream() throws Exception { + + + + private void testParallelUniqueStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000"); - indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0"); + indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5"); + indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5"); + indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4"); commit(); String zkHost = zkServer.getZkAddress(); - //Test one-to-one - Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); + UniqueStream ustream = new UniqueStream(stream, new AscFieldComp("a_f")); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new AscFieldComp("a_f")); + List tuples = getTuples(pstream); + assert(tuples.size() == 5); + assertOrder(tuples, 0,1,3,4,6); - Map fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); + //Test the eofTuples - Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - String[] keys = {"a_f"}; - - MergeJoinStream mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - ParallelStream fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); - - List tuples = getTuples(fstream); - - assert(tuples.size() == 1); - assertOrder(tuples, 0); - assertLong(tuples.get(0), "join_i", 1000); - - - //Test one-to-many - - paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); - - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-one - - paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); - - tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 2,0); - assertLong(tuples.get(0), "join_i", 2000); - assertLong(tuples.get(1), "join_i", 2000); - - //Test many-to-many - - paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); - streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - fieldMappings = new HashMap(); - fieldMappings.put("id","streamB.id"); - - paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); - streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - streamB.setFieldMappings(fieldMappings); - - - mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); - fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); - - tuples = getTuples(fstream); - - assert(tuples.size() == 4); - assertOrder(tuples, 7,7,0,0); - assertLong(tuples.get(0), "join_i", 1000); - assertLong(tuples.get(1), "join_i", 2000); - assertLong(tuples.get(2), "join_i", 1000); - assertLong(tuples.get(3), "join_i", 2000); + Map eofTuples = pstream.getEofTuples(); + assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker. del("*:*"); commit(); @@ -478,495 +241,188 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { commit(); } - private void testRollupStream() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); - indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); + private void testParallelRankStream() throws Exception { + + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1"); + indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1"); + indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1"); + indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1"); + indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1"); commit(); String zkHost = zkServer.getZkAddress(); - Bucket[] buckets = {new Bucket("a_s")}; - Metric[] metrics = {new SumMetric("a_i", false), - new MeanMetric("a_i", false), - new CountMetric(), - new MinMetric("a_i", false), - new MaxMetric("a_i", false)}; - - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc"); - CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - RollupStream rstream = new RollupStream(stream, buckets, metrics); - rstream.open(); - Tuple tuple = rstream.read(); - String b = (String)tuple.get("buckets"); - List values = (List)tuple.get("metricValues"); - assert(b.equals("hello0")); - assert(values.get(0) == 102.0d); - assert(values.get(1) == 51.0d); - assert(values.get(2) == 2.0d); - assert(values.get(3) == 2.0d); - assert(values.get(4) == 100.0d); - - tuple = rstream.read(); - b = (String)tuple.get("buckets"); - values = (List)tuple.get("metricValues"); - assert(b.equals("hello1")); - assert(values.get(0) == 3.0d); - assert(values.get(1) == 1.0d); - assert(values.get(2) == 3.0d); - assert(values.get(3) == 1.0d); - assert(values.get(4) == 1.0d); - - - tuple = rstream.read(); - b = (String)tuple.get("buckets"); - values = (List)tuple.get("metricValues"); - assert(b.equals("hello3")); - assert(values.get(0) == 7.0d); - assert(values.get(1) == 3.5d); - assert(values.get(2) == 2.0d); - assert(values.get(3) == 3.0d); - assert(values.get(4) == 4.0d); - - tuple = rstream.read(); - assert(tuple.EOF); - - rstream.close(); - del("*:*"); - commit(); - } - - private void testParallelRollupStream() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); - indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - String zkHost = zkServer.getZkAddress(); - - Bucket[] buckets = {new Bucket("a_s")}; - Metric[] metrics = {new SumMetric("a_i", false), - new MeanMetric("a_i", false), - new CountMetric(), - new MinMetric("a_i", false), - new MaxMetric("a_i", false)}; - - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc","partitionKeys","a_s"); - CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - RollupStream rostream = new RollupStream(stream, buckets, metrics); - ParallelStream rstream = new ParallelStream(zkHost,"collection1", rostream, 2, new AscFieldComp("buckets")); - - rstream.open(); - Tuple tuple = rstream.read(); - String b = (String)tuple.get("buckets"); - List values = (List)tuple.get("metricValues"); - assert(b.equals("hello0")); - assert(values.get(0) == 102.0d); - assert(values.get(1) == 51.0d); - assert(values.get(2) == 2.0d); - assert(values.get(3) == 2.0d); - assert(values.get(4) == 100.0d); - - tuple = rstream.read(); - b = (String)tuple.get("buckets"); - values = (List)tuple.get("metricValues"); - assert(b.equals("hello1")); - assert(values.get(0) == 3.0d); - assert(values.get(1) == 1.0d); - assert(values.get(2) == 3.0d); - assert(values.get(3) == 1.0d); - assert(values.get(4) == 1.0d); - - - tuple = rstream.read(); - b = (String)tuple.get("buckets"); - values = (List)tuple.get("metricValues"); - assert(b.equals("hello3")); - assert(values.get(0) == 7.0d); - assert(values.get(1) == 3.5d); - assert(values.get(2) == 2.0d); - assert(values.get(3) == 3.0d); - assert(values.get(4) == 4.0d); - - tuple = rstream.read(); - assert(tuple.EOF); - - rstream.close(); - del("*:*"); - commit(); - } - - - - private void testMetricStream() throws Exception { - - indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); - indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - String zkHost = zkServer.getZkAddress(); - - Bucket[] buckets = {new Bucket("a_s")}; - Metric[] metrics = {new SumMetric("a_i", false), - new MeanMetric("a_i", false), - new CountMetric(), - new MinMetric("a_i", false), - new MaxMetric("a_i", false)}; - - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5); - getTuples(mstream); - - BucketMetrics[] bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - - //Bucket should be is descending order based on Metric 0, which is the SumMetric. - - assert(bucketMetrics[0].getKey().toString().equals("hello0")); - assert(bucketMetrics[1].getKey().toString().equals("hello3")); - assert(bucketMetrics[2].getKey().toString().equals("hello1")); - - assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics - - - assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); - assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d); - - - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - - assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics - - assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); - assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d); - - indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record. - commit(); - - //Test desc comp with more buckets then priority queue can hold. - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - assert(bucketMetrics[0].getKey().toString().equals("hello4")); - assert(bucketMetrics[1].getKey().toString().equals("hello0")); - assert(bucketMetrics[2].getKey().toString().equals("hello3")); - - //Test asc comp with more buckets then priority queue can hold. - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - assert(bucketMetrics[0].getKey().toString().equals("hello1")); - assert(bucketMetrics[1].getKey().toString().equals("hello3")); - assert(bucketMetrics[2].getKey().toString().equals("hello0")); - - - //Test with no buckets - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, metrics, "metric1"); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 1); - assert(bucketMetrics[0].getKey().toString().equals("metrics")); - assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics - - del("*:*"); - commit(); - } - - - private void testParallelMetricStream() throws Exception { - - indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); - indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); - indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - String zkHost = zkServer.getZkAddress(); - - Bucket[] buckets = {new Bucket("a_s")}; - Metric[] metrics = {new SumMetric("a_i", false), - new MeanMetric("a_i", false), - new CountMetric(), - new MinMetric("a_i", false), - new MaxMetric("a_i", false)}; - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5); - ParallelStream pstream = new ParallelStream(zkHost,"collection1",mstream,2,new AscFieldComp("a_i")); - getTuples(pstream); + RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i")); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i")); + List tuples = getTuples(pstream); - BucketMetrics[] bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - - //Bucket should be is descending order based on Metric 0, which is the SumMetric. - - assert(bucketMetrics[0].getKey().toString().equals("hello0")); - assert(bucketMetrics[1].getKey().toString().equals("hello3")); - assert(bucketMetrics[2].getKey().toString().equals("hello1")); - - assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics - - - assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); - assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d); - - - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - - assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics - assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics - - assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); - assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d); - - indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record. - commit(); - - //Test desc comp with more buckets then priority queue can hold. - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - assert(bucketMetrics[0].getKey().toString().equals("hello4")); - assert(bucketMetrics[1].getKey().toString().equals("hello0")); - assert(bucketMetrics[2].getKey().toString().equals("hello3")); - - //Test asc comp with more buckets then priority queue can hold. - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 3); - assert(bucketMetrics[0].getKey().toString().equals("hello1")); - assert(bucketMetrics[1].getKey().toString().equals("hello3")); - assert(bucketMetrics[2].getKey().toString().equals("hello0")); - - - //Test with no buckets - params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - stream = new CloudSolrStream(zkHost, "collection1", params); - mstream = new MetricStream(stream, metrics, "metric1"); - getTuples(mstream); - - bucketMetrics = mstream.getBucketMetrics(); - assert(bucketMetrics.length == 1); - assert(bucketMetrics[0].getKey().toString().equals("metrics")); - assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics + assert(tuples.size() == 10); + assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0); del("*:*"); commit(); } - private void testGroupByStream() throws Exception { + private void testTrace() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "1"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); commit(); - //Test CloudSolrStream and SumStream over an int field String zkHost = zkServer.getZkAddress(); - Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc"); + //Test with spaces in the parameter lists. + Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - GroupByStream gstream = new GroupByStream(stream, new AscFieldComp("a_s"), new DescFieldComp("a_i"), 5); + stream.setTrace(true); + List tuples = getTuples(stream); + assert(tuples.get(0).get("_COLLECTION_").equals("collection1")); + assert(tuples.get(1).get("_COLLECTION_").equals("collection1")); + assert(tuples.get(2).get("_COLLECTION_").equals("collection1")); + assert(tuples.get(3).get("_COLLECTION_").equals("collection1")); - List tuples = getTuples(gstream); + del("*:*"); + commit(); + } + + + + + private void testReducerStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + //Test with spaces in the parameter lists. + Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); + ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s")); + + List tuples = getTuples(rstream); assert(tuples.size() == 3); - assertOrder(tuples, 2,3,4); - assertGroupOrder(tuples.get(0), 1, 0); + assertOrder(tuples, 0,3,4); + + Tuple t0 = tuples.get(0); + List maps0 = t0.getMaps(); + assertMaps(maps0, 0, 2,1, 9); + + Tuple t1 = tuples.get(1); + List maps1 = t1.getMaps(); + assertMaps(maps1, 3, 5, 7, 8); + + Tuple t2 = tuples.get(2); + List maps2 = t2.getMaps(); + assertMaps(maps2, 4, 6); + + del("*:*"); commit(); } - private void testFilterStream() throws Exception { + private void testParallelReducerStream() throws Exception { - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - //Test CloudSolrStream and SumStream over an int field - String zkHost = zkServer.getZkAddress(); - - Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - - - FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s")); - List tuples = getTuples(fstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,2); - - del("*:*"); - commit(); - } - - private void testParallelStream() throws Exception { - - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); commit(); String zkHost = zkServer.getZkAddress(); - Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); + ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s")); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s")); - Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - - FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s")); - ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 2, new AscFieldComp("a_s")); - List tuples = getTuples(pstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,2); - - del("*:*"); - commit(); - } - - private void testParallelStreamSingleWorker() throws Exception { - - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - String zkHost = zkServer.getZkAddress(); - - Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s"); - CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); - - Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s"); - CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - - FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s")); - ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 1, new AscFieldComp("a_s")); - List tuples = getTuples(pstream); - - assert(tuples.size() == 2); - assertOrder(tuples, 0,2); - - del("*:*"); - commit(); - } - - - private void testParallelHashJoinStream() { - - } - - private void testParallelGroupByStream() throws Exception { - - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); - - commit(); - - String zkHost = zkServer.getZkAddress(); - - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s"); - CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - GroupByStream gstream = new GroupByStream(stream, new AscFieldComp("a_s"), new AscFieldComp("a_i"),5); - ParallelStream pstream = new ParallelStream(zkHost,"collection1", gstream, 2, new AscFieldComp("a_s")); List tuples = getTuples(pstream); assert(tuples.size() == 3); - assertOrder(tuples, 0,1,2); - assertGroupOrder(tuples.get(0),3,4); + assertOrder(tuples, 0,3,4); + + Tuple t0 = tuples.get(0); + List maps0 = t0.getMaps(); + assertMaps(maps0, 0, 2, 1, 9); + + Tuple t1 = tuples.get(1); + List maps1 = t1.getMaps(); + assertMaps(maps1, 3, 5, 7, 8); + + Tuple t2 = tuples.get(2); + List maps2 = t2.getMaps(); + assertMaps(maps2, 4, 6); + + //Test Descending with Ascending subsort + + paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s"); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + rstream = new ReducerStream(stream, new DescFieldComp("a_s")); + pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_s")); + + tuples = getTuples(pstream); + + assert(tuples.size() == 3); + assertOrder(tuples, 4,3,0); + + t0 = tuples.get(0); + maps0 = t0.getMaps(); + assertMaps(maps0, 4, 6); + + + t1 = tuples.get(1); + maps1 = t1.getMaps(); + assertMaps(maps1, 3, 5, 7, 8); + + + t2 = tuples.get(2); + maps2 = t2.getMaps(); + assertMaps(maps2, 0, 2, 1, 9); + + + del("*:*"); commit(); } - private void testTuple() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3"); @@ -975,7 +431,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); - Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc", "partitionKeys","a_s"); + Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); List tuples = getTuples(stream); Tuple tuple = tuples.get(0); @@ -1006,7 +462,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { commit(); } - private void testMergeStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); @@ -1076,6 +531,104 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { } + private void testParallelMergeStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0"); + indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0"); + indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3"); + indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4"); + indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + //Test ascending + Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); + CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); + CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + + MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i")); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new AscFieldComp("a_i")); + List tuples = getTuples(pstream); + + assert(tuples.size() == 9); + assertOrder(tuples, 0,1,2,3,4,7,6,8,9); + + //Test descending + paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i"); + streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i"); + streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + + mstream = new MergeStream(streamA, streamB, new DescFieldComp("a_i")); + pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new DescFieldComp("a_i")); + tuples = getTuples(pstream); + + assert(tuples.size() == 8); + assertOrder(tuples, 9,8,6,4,3,2,1,0); + + del("*:*"); + commit(); + } + + private void testParallelEOF() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0"); + indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0"); + indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3"); + indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4"); + indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + //Test ascending + Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); + CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); + CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + + MergeStream mstream = new MergeStream(streamA, streamB, new AscFieldComp("a_i")); + CountStream cstream = new CountStream(mstream); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new AscFieldComp("a_i")); + List tuples = getTuples(pstream); + + assert(tuples.size() == 9); + Map eofTuples = pstream.getEofTuples(); + assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker. + + long totalCount = 0; + + Iterator it = eofTuples.values().iterator(); + while(it.hasNext()) { + Tuple t = it.next(); + totalCount += t.getLong("count"); + } + + assert(tuples.size() == totalCount); + + del("*:*"); + commit(); + } + + + @Test public void streamTests() throws Exception { assertNotNull(cloudClient); @@ -1100,7 +653,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); Map params = null; - //Basic CloudSolrStream Test with Ascending Sort + //Basic CloudSolrStream Test with Descending Sort params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); @@ -1109,7 +662,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { assert(tuples.size() == 5); assertOrder(tuples, 4, 3, 2, 1, 0); - //With Descending Sort + //With Ascending Sort params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); stream = new CloudSolrStream(zkHost, "collection1", params); tuples = getTuples(stream); @@ -1138,22 +691,18 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { commit(); testTuple(); + testSpacesInParams(); + testNonePartitionKeys(); + testTrace(); testUniqueStream(); - testMetricStream(); - testRollupStream(); testRankStream(); - testFilterStream(); - testGroupByStream(); - testHashJoinStream(); - testMergeJoinStream(); testMergeStream(); - testParallelStreamSingleWorker(); - testParallelStream(); - testParallelRollupStream(); - testParallelMetricStream(); - testParallelGroupByStream(); - testParallelHashJoinStream(); - testParallelMergeJoinStream(); + testReducerStream(); + testParallelEOF(); + testParallelUniqueStream(); + testParallelRankStream(); + testParallelMergeStream(); + testParallelReducerStream(); } protected Map mapParams(String... vals) { @@ -1208,6 +757,23 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { return true; } + protected boolean assertMaps(List maps, int... ids) throws Exception { + if(maps.size() != ids.length) { + throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size()); + } + + int i=0; + for(int val : ids) { + Map t = maps.get(i); + Long tip = (Long)t.get("id"); + if(tip.intValue() != val) { + throw new Exception("Found value:"+tip.intValue()+" expecting:"+val); + } + ++i; + } + return true; + } + public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception { long lv = (long)tuple.get(fieldName); if(lv != l) { @@ -1217,15 +783,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { return true; } - public boolean assertMetric(Metric metric, double value) throws Exception { - Double d = metric.getValue(); - if(d.doubleValue() != value) { - throw new Exception("Unexpected Metric "+d+"!="+value); - } - - return true; - } - @Override protected void indexr(Object... fields) throws Exception { SolrInputDocument doc = getDoc(fields);