SOLR-7082: Streaming Aggregation for SolrCloud

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1669164 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-03-25 17:17:45 +00:00
parent 220a0ea642
commit e233f14f82
34 changed files with 619 additions and 2553 deletions

View File

@ -69,7 +69,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
StreamContext context = new StreamContext();
context.workerID = worker;
context.numWorkers = numWorkers;
context.clientCache = clientCache;
context.setSolrClientCache(clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", tupleStream);
}

View File

@ -1,42 +0,0 @@
package org.apache.solr.client.solrj.io;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Serializable;
import java.util.Comparator;
public class AscBucketComp implements Comparator<BucketMetrics>, Serializable {
private int ord;
public AscBucketComp(int ord) {
this.ord = ord;
}
public int compare(BucketMetrics b1, BucketMetrics b2) {
double d1 = b1.getMetrics()[ord].getValue();
double d2 = b2.getMetrics()[ord].getValue();
if(d1 > d2) {
return 1;
} else if(d1 < d2) {
return -1;
} else {
return 0;
}
}
}

View File

@ -20,6 +20,12 @@ package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Comparator;
/**
* An ascending field Comparator which compares a field of two Tuples and determines sort order.
**/
public class AscFieldComp implements Comparator<Tuple>, Serializable {
private static final long serialVersionUID = 1;

View File

@ -1,39 +0,0 @@
package org.apache.solr.client.solrj.io;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
public class AscMetricComp implements Comparator<Tuple>, Serializable {
private static final long serialVersionUID = 1;
private int ord;
public AscMetricComp(int ord) {
this.ord = ord;
}
public int compare(Tuple t1, Tuple t2) {
List<Double> values1 = (List<Double>)t1.get("metricValues");
List<Double> values2 = (List<Double>)t2.get("metricValues");
return values1.get(ord).compareTo(values2.get(ord));
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
/*
*
*/
import java.io.Serializable;
public class Bucket implements Serializable {
private static final long serialVersionUID = 1;
private String bucketKey;
public Bucket() {
}
public Bucket(String bucketKey) {
this.bucketKey = bucketKey;
}
public String getBucketValue(Tuple tuple) {
return tuple.get(bucketKey).toString();
}
}

View File

@ -1,43 +0,0 @@
package org.apache.solr.client.solrj.io;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Serializable;
public class BucketMetrics implements Serializable {
private static final long serialVersionUID = 1;
private HashKey key;
private Metric[] metrics;
public BucketMetrics(HashKey key, Metric[] metrics) {
this.key = key;
this.metrics = metrics;
}
public Metric[] getMetrics() {
return metrics;
}
public HashKey getKey() {
return key;
}
}

View File

@ -27,28 +27,24 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.zookeeper.KeeperException;
/**
* Connects to Zookeeper to pick replicas from a specific collection to send the query to.
* SolrStream instances are used to send the query to the replicas.
* SolrStreams are opened using a Thread pool, but a single thread is used to iterate through each stream's tuples.* *
* Under the covers SolrStream instances are used to send the query to the replicas.
* SolrStreams are opened using a Thread pool, but a single thread is used to iterate through each stream's tuples.
**/
public class CloudSolrStream extends TupleStream {
@ -59,24 +55,25 @@ public class CloudSolrStream extends TupleStream {
protected String collection;
protected Map params;
private Map<String, String> fieldMappings;
protected TreeSet<TupleWrapper> tuples;
protected Comparator<Tuple> comp;
protected List<TupleStream> solrStreams = new ArrayList();
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
private int numWorkers;
private int workerID;
protected Map<String, Tuple> eofTuples = new HashMap();
private boolean trace;
protected transient Map<String, Tuple> eofTuples;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
protected transient List<TupleStream> solrStreams;
protected transient TreeSet<TupleWrapper> tuples;
protected transient StreamContext streamContext;
public CloudSolrStream(String zkHost, String collection, Map params) {
public CloudSolrStream(String zkHost, String collection, Map params) throws IOException {
this.zkHost = zkHost;
this.collection = collection;
this.params = params;
this.tuples = new TreeSet();
String sort = (String)params.get("sort");
this.comp = parseComp(sort);
this.comp = parseComp(sort, params);
}
//Used by the ParallelStream
@ -92,13 +89,21 @@ public class CloudSolrStream extends TupleStream {
this.fieldMappings = fieldMappings;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
public void setStreamContext(StreamContext context) {
this.numWorkers = context.numWorkers;
this.workerID = context.workerID;
this.cache = context.clientCache;
this.cache = context.getSolrClientCache();
this.streamContext = context;
}
public void open() throws IOException {
this.tuples = new TreeSet();
this.solrStreams = new ArrayList();
this.eofTuples = new HashMap();
if(this.cache != null) {
this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
} else {
@ -110,17 +115,33 @@ public class CloudSolrStream extends TupleStream {
}
public Map getEofTuples() {
return this.eofTuples;
}
public List<TupleStream> children() {
return solrStreams;
}
private Comparator<Tuple> parseComp(String sort) {
private Comparator<Tuple> parseComp(String sort, Map params) throws IOException {
String fl = (String)params.get("fl");
String[] fls = fl.split(",");
HashSet fieldSet = new HashSet();
for(String f : fls) {
fieldSet.add(f.trim()); //Handle spaces in the field list.
}
String[] sorts = sort.split(",");
Comparator[] comps = new Comparator[sorts.length];
for(int i=0; i<sorts.length; i++) {
String s = sorts[i];
String[] spec = s.split(" ");
String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
if(!fieldSet.contains(spec[0])) {
throw new IOException("Fields in the sort spec must be included in the field list:"+spec[0]);
}
if(spec[1].trim().equalsIgnoreCase("asc")) {
comps[i] = new AscFieldComp(spec[0]);
} else {
@ -159,10 +180,9 @@ public class CloudSolrStream extends TupleStream {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
SolrStream solrStream = new SolrStream(url, params);
StreamContext context = new StreamContext();
context.numWorkers = this.numWorkers;
context.workerID = this.workerID;
solrStream.setStreamContext(context);
if(streamContext != null) {
solrStream.setStreamContext(streamContext);
}
solrStream.setFieldMappings(this.fieldMappings);
solrStreams.add(solrStream);
}
@ -173,25 +193,27 @@ public class CloudSolrStream extends TupleStream {
private void openStreams() throws IOException {
ExecutorService service = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream"));
List<Future<TupleWrapper>> futures = new ArrayList();
for(TupleStream solrStream : solrStreams) {
StreamOpener so = new StreamOpener((SolrStream)solrStream, comp);
Future<TupleWrapper> future = service.submit(so);
futures.add(future);
}
try {
for(Future<TupleWrapper> f : futures) {
TupleWrapper w = f.get();
if(w != null) {
tuples.add(w);
}
List<Future<TupleWrapper>> futures = new ArrayList();
for (TupleStream solrStream : solrStreams) {
StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
Future<TupleWrapper> future = service.submit(so);
futures.add(future);
}
} catch (Exception e) {
throw new IOException(e);
}
service.shutdown();
try {
for (Future<TupleWrapper> f : futures) {
TupleWrapper w = f.get();
if (w != null) {
tuples.add(w);
}
}
} catch (Exception e) {
throw new IOException(e);
}
} finally {
service.shutdown();
}
}
public void close() throws IOException {
@ -212,12 +234,21 @@ public class CloudSolrStream extends TupleStream {
TupleWrapper tw = tuples.pollFirst();
if(tw != null) {
Tuple t = tw.getTuple();
if (trace) {
t.put("_COLLECTION_", this.collection);
}
if(tw.next()) {
tuples.add(tw);
}
return t;
} else {
Map m = new HashMap();
if(trace) {
m.put("_COLLECTION_", this.collection);
}
m.put("EOF", true);
return new Tuple(m);

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
public class CountMetric implements Metric, Serializable {
private static final long serialVersionUID = 1;
public static final String COUNT = "count";
private long count;
public String getName() {
return "count";
}
public void update(Tuple tuple) {
++count;
}
public double getValue() {
return count;
}
public Metric newInstance() {
return new CountMetric();
}
public Map<String, Double> metricValues() {
Map m = new HashMap();
double d = (double)count;
m.put(COUNT, d);
return m;
}
public void update(Map<String, Double> metricValues) {
double dcount = metricValues.get(COUNT);
count+=(long)dcount;
}
}

View File

@ -1,42 +0,0 @@
package org.apache.solr.client.solrj.io;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Serializable;
import java.util.Comparator;
public class DescBucketComp implements Comparator<BucketMetrics>, Serializable {
private int ord;
public DescBucketComp(int ord) {
this.ord = ord;
}
public int compare(BucketMetrics b1, BucketMetrics b2) {
double d1 = b1.getMetrics()[ord].getValue();
double d2 = b2.getMetrics()[ord].getValue();
if(d1 > d2) {
return -1;
} else if(d1 < d2) {
return 1;
} else {
return 0;
}
}
}

View File

@ -20,6 +20,11 @@ package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Comparator;
/**
* An descending field Comparator which compares a field of two Tuples and determines sort order.
**/
public class DescFieldComp implements Comparator<Tuple>, Serializable {
private static final long serialVersionUID = 1;
@ -33,11 +38,11 @@ public class DescFieldComp implements Comparator<Tuple>, Serializable {
public int compare(Tuple t1, Tuple t2) {
Comparable o1 = (Comparable)t1.get(field);
Comparable o2 = (Comparable)t2.get(field);
int i = o1.compareTo(o2);
if(i == 0) {
return i;
int c = o1.compareTo(o2);
if(c == 0) {
return 0;
} else {
return -i;
return -c;
}
}
}

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;
public class FilterStream extends TupleStream {
private static final long serialVersionUID = 1;
private TupleStream streamA;
private TupleStream streamB;
private Comparator<Tuple> comp;
private Tuple a = null;
private Tuple b = null;
/*
* Intersects streamA by streamB based on a Comparator.
* Both streams must be sorted by the fields being compared.
* StreamB must be unique for the fields being compared.
**/
public FilterStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
this.streamA = streamA;
this.streamB = streamB;
this.comp = comp;
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
this.streamB.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(streamA);
l.add(streamB);
return l;
}
public void open() throws IOException {
streamA.open();
streamB.open();
}
public void close() throws IOException {
streamA.close();
streamB.close();
}
public Tuple read() throws IOException {
a = streamA.read();
if(b == null) {
b = streamB.read();
}
while(true) {
if(a.EOF) {
return a;
}
if(b.EOF) {
return b;
}
int i = comp.compare(a, b);
if(i == 0) {
return a;
} else if(i < 0) {
// a < b so advance a
a = streamA.read();
} else {
// a > b so advance b
b = streamB.read();
}
}
}
public int getCost() {
return 0;
}
}

View File

@ -1,142 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.PriorityQueue;
/**
* Iterates over a TupleStream Groups The TopN Tuples of a group.
**/
public class GroupByStream extends TupleStream {
private static final long serialVersionUID = 1;
private TupleStream tupleStream;
private Comparator<Tuple> interGroupComp;
private Comparator<Tuple> intraGroupComp;
private Comparator<Tuple> reverseComp;
private Tuple currentTuple;
private int size;
public GroupByStream(TupleStream tupleStream,
Comparator<Tuple> interGroupComp,
Comparator<Tuple> intraGroupComp,
int size) {
this.tupleStream = tupleStream;
this.interGroupComp = interGroupComp;
this.intraGroupComp = intraGroupComp;
this.reverseComp = new ReverseComp(intraGroupComp);
this.size = size;
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(tupleStream);
return l;
}
public void open() throws IOException {
tupleStream.open();
currentTuple = tupleStream.read(); //Read the first Tuple so currentTuple is never null;
}
public void close() throws IOException {
tupleStream.close();
}
public Tuple read() throws IOException {
if(currentTuple.EOF) {
return currentTuple;
}
PriorityQueue<Tuple> group = new PriorityQueue<>(size, reverseComp);
group.add(currentTuple);
while(true) {
Tuple t = tupleStream.read();
if(t.EOF) {
currentTuple = t;
break;
}
if(interGroupComp.compare(currentTuple, t) == 0) {
if(group.size() >= size) {
Tuple peek = group.peek();
if(intraGroupComp.compare(t, peek) < 0) {
group.poll();
group.add(t);
}
} else {
group.add(t);
}
} else {
currentTuple = t;
break;
}
}
//We have a finished group so add the Tuples to an array.
Tuple[] members = new Tuple[group.size()];
for(int i=group.size()-1; i>=0; i--) {
Tuple t = group.poll();
members[i] = t;
}
//First Tuple is the group head.
Tuple groupHead = members[0];
if(members.length > 1) {
List groupList = new ArrayList();
for(int i=1; i<members.length; i++) {
groupList.add(members[i].fields);
}
groupHead.set("tuples", groupList);
} else {
groupHead.set("tuples", new ArrayList());
}
return groupHead;
}
public int getCost() {
return 0;
}
class ReverseComp implements Comparator<Tuple>, Serializable {
private Comparator<Tuple> comp;
public ReverseComp(Comparator<Tuple> comp) {
this.comp = comp;
}
public int compare(Tuple t1, Tuple t2) {
return comp.compare(t1, t2)*(-1);
}
}
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
/**
*
*
**/
public class HashJoinStream extends TupleStream {
private static final long serialVersionUID = 1;
private PushBackStream streamA;
private TupleStream streamB;
private String[] keys;
private HashMap<HashKey, List<Tuple>> hashMap = new HashMap();
public HashJoinStream(TupleStream streamA, TupleStream streamB, String[] keys) {
this.streamA = new PushBackStream(streamA);
this.streamB = streamB;
this.keys = keys;
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
this.streamB.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(streamA);
l.add(streamB);
return l;
}
public void open() throws IOException {
streamB.open();
while(true) {
Tuple t = streamB.read();
if(t.EOF) {
break;
}
HashKey hashKey = new HashKey(t, keys);
if(hashMap.containsKey(hashKey)) {
List<Tuple> tuples = hashMap.get(hashKey);
tuples.add(t);
} else {
List<Tuple> tuples = new ArrayList();
tuples.add(t);
hashMap.put(hashKey, tuples);
}
}
streamB.close();
streamA.open();
}
public void close() throws IOException {
streamA.close();
}
private LinkedList<Tuple> joinTuples = new LinkedList();
public Tuple read() throws IOException {
while(true) {
Tuple tuple = streamA.read();
if(tuple.EOF) {
return tuple;
}
if(joinTuples.size() > 0) {
Tuple t = tuple.clone();
Tuple j = joinTuples.removeFirst();
t.fields.putAll(j.fields);
if(joinTuples.size() > 0) {
streamA.pushBack(tuple);
}
return t;
} else {
HashKey hashKey = new HashKey(tuple, keys);
if(hashMap.containsKey(hashKey)) {
List<Tuple> joinWith = hashMap.get(hashKey);
for(Tuple jt : joinWith) {
joinTuples.add(jt);
}
streamA.pushBack(tuple);
}
}
}
}
public int getCost() {
return 0;
}
}

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
public class HashKey implements Serializable {
private static final long serialVersionUID = 1;
private Object[] parts;
public HashKey(String value) {
parts = (Object[])value.split("::");
}
public HashKey(Tuple t, String[] keys) {
this.parts = new Object[keys.length];
for(int i=0; i<keys.length; i++) {
parts[i] = t.get(keys[i]);
}
}
public HashKey(String[] parts) {
this.parts = parts;
}
public Object getParts() {
return parts;
}
public int hashCode() {
int h = 0;
for(Object o : parts) {
h+=o.hashCode();
}
return h;
}
public boolean equals(Object o) {
HashKey h = (HashKey)o;
for(int i=0; i<parts.length; i++) {
if(!parts[i].equals(h.parts[i])) {
return false;
}
}
return true;
}
public String toString() {
StringBuilder buf = new StringBuilder();
for(int i=0; i<parts.length; i++) {
if(i > 0) {
buf.append("::");
}
buf.append(parts[i].toString());
}
return buf.toString();
}
}

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
public class MaxMetric implements Metric, Serializable {
public static final String MAX = "max";
private long longMax = -Long.MAX_VALUE;
private double doubleMax = Double.MAX_VALUE;
private boolean isDouble;
private String column;
public MaxMetric(String column, boolean isDouble) {
this.column = column;
this.isDouble = isDouble;
}
public String getName() {
return "mix:"+column;
}
public double getValue() {
if(isDouble) {
return doubleMax;
} else {
return longMax;
}
}
public void update(Tuple tuple) {
if(isDouble) {
double d = (double)tuple.get(column);
if(d > doubleMax) {
doubleMax = d;
}
} else {
long l = (long)tuple.get(column);
if(l > longMax) {
longMax = l;
}
}
}
public Metric newInstance() {
return new MaxMetric(column, isDouble);
}
public Map<String, Double> metricValues() {
Map m = new HashMap();
if(isDouble) {
m.put(MAX,doubleMax);
} else {
doubleMax = (double)longMax;
m.put(MAX,doubleMax);
}
return m;
}
public void update(Map<String, Double> metricValues) {
if(isDouble) {
double dmax = metricValues.get(MAX);
if(dmax > doubleMax) {
doubleMax = dmax;
}
} else {
double dmax = metricValues.get(MAX);
long lmax = (long) dmax;
if(lmax > longMax) {
longMax = lmax;
}
}
}
}

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
public class MeanMetric implements Metric, Serializable {
private static final long serialVersionUID = 1;
public static final String SUM = "sum";
public static final String COUNT = "count";
public static final String MEAN = "mean";
private String column;
private boolean isDouble;
private double doubleSum;
private long longSum;
private long count;
public MeanMetric(String column, boolean isDouble) {
this.column = column;
this.isDouble = isDouble;
}
public String getName() {
return "mean:"+column;
}
public void update(Tuple tuple) {
++count;
if(isDouble) {
Double d = (Double)tuple.get(column);
doubleSum += d.doubleValue();
} else {
Long l = (Long)tuple.get(column);
longSum += l.doubleValue();
}
}
public Metric newInstance() {
return new MeanMetric(column, isDouble);
}
public double getValue() {
double dcount = (double)count;
if(isDouble) {
double ave = doubleSum/dcount;
return ave;
} else {
double ave = longSum/dcount;
return ave;
}
}
public Map<String, Double> metricValues() {
Map m = new HashMap();
double dcount = (double)count;
m.put(COUNT, dcount);
if(isDouble) {
double ave = doubleSum/dcount;
m.put(MEAN,ave);
m.put(SUM,doubleSum);
} else {
double ave = longSum/dcount;
doubleSum = (double)longSum;
m.put(MEAN,ave);
m.put(SUM,doubleSum);
}
return m;
}
public void update(Map<String, Double> metricValues) {
double dcount = metricValues.get(COUNT);
count += (long)dcount;
if(isDouble) {
double dsum = metricValues.get(SUM);
doubleSum+=dsum;
} else {
double dsum = metricValues.get(SUM);
longSum+=(long)dsum;
}
}
}

View File

@ -1,162 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Comparator;
/**
* Merge Joins streamA with streamB based on the Comparator.
* Supports:
* one-to-one, one-to-many, many-to-one and many-to-many joins
*
**/
public class MergeJoinStream extends TupleStream {
private static final long serialVersionUID = 1;
private PushBackStream streamA;
private PushBackStream streamB;
private Comparator<Tuple> comp;
public MergeJoinStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
this.streamA = new PushBackStream(streamA);
this.streamB = new PushBackStream(streamB);
this.comp = comp;
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
this.streamB.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(streamA);
l.add(streamB);
return l;
}
public void open() throws IOException {
streamA.open();
streamB.open();
}
public void close() throws IOException {
streamA.close();
streamB.close();
}
private LinkedList<Tuple> joinTuples = new LinkedList();
private List<Tuple> listA = new ArrayList();
private List<Tuple> listB = new ArrayList();
public Tuple read() throws IOException {
if(joinTuples.size() > 0) {
return joinTuples.removeFirst();
}
OUTER:
while(true) {
if(listA.size() == 0) {
//Stream A needs to be advanced.
Tuple a = streamA.read();
if(a.EOF) {
return a;
}
listA.add(a);
INNERA:
while(true) {
Tuple a1 = streamA.read();
if(a1.EOF) {
streamA.pushBack(a1);
break INNERA;
}
if(comp.compare(a,a1) == 0) {
listA.add(a1);
} else {
streamA.pushBack(a1);
break INNERA;
}
}
}
if(listB.size() == 0) {
//StreamB needs to be advanced.
Tuple b = streamB.read();
if(b.EOF) {
return b;
}
listB.add(b);
INNERA:
while(true) {
Tuple b1 = streamB.read();
if(b1.EOF) {
streamB.pushBack(b1);
break INNERA;
}
if(comp.compare(b,b1) == 0) {
listB.add(b1);
} else {
streamB.pushBack(b1);
break INNERA;
}
}
}
int c = comp.compare(listA.get(0),listB.get(0));
if(c == 0) {
//The Tuple lists match. So build all the Tuple combinations.
for(Tuple aa : listA) {
for(Tuple bb : listB) {
Tuple clone = aa.clone();
clone.fields.putAll(bb.fields);
joinTuples.add(clone);
}
}
//This will advance both streams.
listA.clear();
listB.clear();
return joinTuples.removeFirst();
} else if(c < 0) {
//This will advance streamA
listA.clear();
} else {
//This will advance streamB
listB.clear();
}
}
}
public int getCost() {
return 0;
}
}

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.ArrayList;
/**
* Unions streamA with streamB based on a Comparator.
* Unions streamA with streamB ordering the Tuples based on a Comparator.
* Both streams must be sorted by the fields being compared.
**/

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
public interface Metric extends Serializable {
public String getName();
public double getValue();
public void update(Tuple tuple);
public Metric newInstance();
public Map<String, Double> metricValues();
public void update(Map<String, Double> metricValues);
}

View File

@ -1,280 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.PriorityQueue;
/*
String[] buckets = {"a","b"};
Metric
BucketStream bucketStream = new BucketStream(stream,buckets,metrics,"my-metrics","name");
bucketStream.get(
*/
public class MetricStream extends TupleStream {
private static final long serialVersionUID = 1;
private TupleStream tupleStream;
private Bucket[] buckets;
private Metric[] metrics;
private String outKey;
private Map<HashKey, Metric[]> bucketMap;
private BucketMetrics[] bucketMetrics;
private static final HashKey metricsKey = new HashKey("metrics");
private int topN;
private Comparator<BucketMetrics> comp;
private Comparator<BucketMetrics> rcomp;
public MetricStream(TupleStream tupleStream,
Bucket[] buckets,
Metric[] metrics,
String outKey,
Comparator<BucketMetrics> comp,
int topN) {
this.tupleStream = tupleStream;
this.buckets = buckets;
this.metrics = metrics;
this.outKey = outKey;
this.topN = topN;
this.rcomp = new ReverseOrdComp(comp);
this.comp = comp;
this.bucketMap = new HashMap();
}
public MetricStream(TupleStream tupleStream,
Metric[] metrics,
String outKey) {
this.tupleStream = tupleStream;
this.metrics = metrics;
this.outKey = outKey;
this.bucketMap = new HashMap();
}
public String getOutKey() {
return this.outKey;
}
public BucketMetrics[] getBucketMetrics(){
return bucketMetrics;
}
public void setBucketMetrics(BucketMetrics[] bucketMetrics) {
this.bucketMetrics = bucketMetrics;
}
BucketMetrics[] merge(List<Map> all) {
Map<HashKey, Metric[]> bucketAccumulator = new HashMap();
for(Map top : all) {
List<String> ks = (List<String>)top.get("buckets");
List<List<Map<String,Double>>> ms = (List<List<Map<String,Double>>>)top.get("metrics");
for(int i=0; i<ks.size(); i++) {
String key = ks.get(i);
List<Map<String,Double>> bucketMs = ms.get(i);
HashKey hashKey = new HashKey(key);
if(bucketAccumulator.containsKey(hashKey)) {
Metric[] mergeMetrics = bucketAccumulator.get(hashKey);
for(int m=0; m<mergeMetrics.length; m++) {
mergeMetrics[m].update(bucketMs.get(m));
}
} else {
Metric[] mergedMetrics = new Metric[metrics.length];
for(int m=0; m<metrics.length; m++) {
mergedMetrics[m] = metrics[m].newInstance();
mergedMetrics[m].update(bucketMs.get(m));
}
bucketAccumulator.put(hashKey, mergedMetrics);
}
}
}
Iterator<Map.Entry<HashKey,Metric[]>> it = bucketAccumulator.entrySet().iterator();
PriorityQueue<BucketMetrics> priorityQueue = new PriorityQueue(topN, rcomp);
while(it.hasNext()) {
Map.Entry<HashKey, Metric[]> entry = it.next();
BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue());
if(priorityQueue.size() < topN) {
priorityQueue.add(bms);
} else {
BucketMetrics peek = priorityQueue.peek();
if(comp.compare(bms, peek) < 0) {
priorityQueue.poll();
priorityQueue.add(bms);
}
}
}
int s = priorityQueue.size();
BucketMetrics[] bucketMetrics = new BucketMetrics[s];
for(int i=bucketMetrics.length-1; i>=0; i--) {
BucketMetrics b = priorityQueue.poll();
bucketMetrics[i]= b;
}
return bucketMetrics;
}
private class ReverseOrdComp implements Comparator<BucketMetrics>, Serializable {
private Comparator<BucketMetrics> comp;
public ReverseOrdComp(Comparator<BucketMetrics> comp) {
this.comp = comp;
}
public int compare(BucketMetrics e1, BucketMetrics e2) {
return comp.compare(e1,e2)*-1;
}
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(tupleStream);
return l;
}
public void open() throws IOException {
tupleStream.open();
}
public void close() throws IOException {
tupleStream.close();
}
public Tuple read() throws IOException {
Tuple tuple = tupleStream.read();
if(tuple.EOF) {
Iterator<Map.Entry<HashKey,Metric[]>> it = bucketMap.entrySet().iterator();
if(comp == null) {
//Handle No bucket constructor
Map.Entry<HashKey, Metric[]> noBucket = it.next();
BucketMetrics bms = new BucketMetrics(noBucket.getKey(), noBucket.getValue());
this.bucketMetrics = new BucketMetrics[1];
this.bucketMetrics[0] = bms;
List<Map<String, Double>> outMetrics = new ArrayList();
List<String> outKeys = new ArrayList();
for(Metric metric : bms.getMetrics()) {
Map<String, Double> outMetricValues = metric.metricValues();
String outKey = metric.getName();
outMetrics.add(outMetricValues);
outKeys.add(outKey);
}
Map outMap = new HashMap();
outMap.put("buckets",outKeys);
outMap.put("metrics",outMetrics);
tuple.set(this.outKey, outMap);
return tuple;
}
PriorityQueue<BucketMetrics> priorityQueue = new PriorityQueue(topN, rcomp);
while(it.hasNext()) {
Map.Entry<HashKey, Metric[]> entry = it.next();
BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue());
if(priorityQueue.size() < topN) {
priorityQueue.add(bms);
} else {
BucketMetrics peek = priorityQueue.peek();
if(comp.compare(bms, peek) < 0) {
priorityQueue.poll();
priorityQueue.add(bms);
}
}
}
int s = priorityQueue.size();
this.bucketMetrics = new BucketMetrics[s];
for(int i=bucketMetrics.length-1; i>=0; i--) {
BucketMetrics b = priorityQueue.poll();
this.bucketMetrics[i]= b;
}
List<List<Map<String, Double>>> outMetrics = new ArrayList();
List<String> outBuckets = new ArrayList();
for(BucketMetrics bms : this.bucketMetrics) {
List outBucketMetrics = new ArrayList();
for(Metric metric : bms.getMetrics()) {
Map<String, Double> outMetricValues = metric.metricValues();
outBucketMetrics.add(outMetricValues);
}
outBuckets.add(bms.getKey().toString());
outMetrics.add(outBucketMetrics);
}
Map outMap = new HashMap();
outMap.put("buckets",outBuckets);
outMap.put("metrics",outMetrics);
tuple.set(this.outKey, outMap);
return tuple;
}
HashKey hashKey = null;
if(buckets != null) {
String[] bucketValues = new String[buckets.length];
for(int i=0; i<buckets.length; i++) {
bucketValues[i] = buckets[i].getBucketValue(tuple);
}
hashKey = new HashKey(bucketValues);
} else {
hashKey = metricsKey;
}
Metric[] bucketMetrics = bucketMap.get(hashKey);
if(bucketMetrics != null) {
for(Metric bucketMetric : bucketMetrics) {
bucketMetric.update(tuple);
}
} else {
bucketMetrics = new Metric[metrics.length];
for(int i=0; i<metrics.length; i++) {
Metric bucketMetric = metrics[i].newInstance();
bucketMetric.update(tuple);
bucketMetrics[i] = bucketMetric;
}
bucketMap.put(hashKey, bucketMetrics);
}
return tuple;
}
public int getCost() {
return 0;
}
}

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
public class MinMetric implements Metric, Serializable {
public static final String MIN = "min";
private long longMin = Long.MAX_VALUE;
private double doubleMin = Double.MAX_VALUE;
private boolean isDouble;
private String column;
public MinMetric(String column, boolean isDouble) {
this.column = column;
this.isDouble = isDouble;
}
public String getName() {
return "min:"+column;
}
public double getValue() {
if(isDouble) {
return doubleMin;
} else {
return longMin;
}
}
public void update(Tuple tuple) {
if(isDouble) {
double d = (double)tuple.get(column);
if(d < doubleMin) {
doubleMin = d;
}
} else {
long l = (long)tuple.get(column);
if(l < longMin) {
longMin = l;
}
}
}
public Metric newInstance() {
return new MinMetric(column, isDouble);
}
public Map<String, Double> metricValues() {
Map m = new HashMap();
if(isDouble) {
m.put(MIN,doubleMin);
} else {
doubleMin = (double)longMin;
m.put(MIN,doubleMin);
}
return m;
}
public void update(Map<String, Double> metricValues) {
if(isDouble) {
double dmin = metricValues.get(MIN);
if(dmin < doubleMin) {
doubleMin = dmin;
}
} else {
double dmin = metricValues.get(MIN);
long lmin = (long) dmin;
if(lmin < longMin) {
longMin = lmin;
}
}
}
}

View File

@ -20,6 +20,11 @@ package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Comparator;
/**
* Wraps multiple Comparators to provide sub sorting.
**/
public class MultiComp implements Comparator<Tuple>, Serializable {
private static final long serialVersionUID = 1;

View File

@ -52,7 +52,6 @@ public class ParallelStream extends CloudSolrStream {
private int workers;
private String encoded;
public ParallelStream(String zkHost,
String collection,
TupleStream tupleStream,
@ -78,30 +77,10 @@ public class ParallelStream extends CloudSolrStream {
return l;
}
public void merge(List<MetricStream> metricStreams) {
for(MetricStream metricStream : metricStreams) {
String outKey = metricStream.getOutKey();
Iterator<Tuple> it = eofTuples.values().iterator();
List values = new ArrayList();
while(it.hasNext()) {
Tuple t = it.next();
Map top = (Map)t.get(outKey);
values.add(top);
}
BucketMetrics[] bucketMetrics = metricStream.merge(values);
metricStream.setBucketMetrics(bucketMetrics);
}
}
public Tuple read() throws IOException {
Tuple tuple = _read();
if(tuple.EOF) {
List<MetricStream> metricStreams = new ArrayList();
getMetricStreams(this, metricStreams);
this.merge(metricStreams);
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
@ -110,16 +89,10 @@ public class ParallelStream extends CloudSolrStream {
return tuple;
}
private void getMetricStreams(TupleStream tupleStream,
List<MetricStream> metricStreams) {
if(tupleStream instanceof MetricStream) {
metricStreams.add((MetricStream)tupleStream);
}
List<TupleStream> children = tupleStream.children();
for(TupleStream ts : children) {
getMetricStreams(ts, metricStreams);
}
public void setStreamContext(StreamContext streamContext) {
//Note the parallel stream does not set the StreamContext on it's substream.
//This is because the substream is not actually opened by the ParallelStream.
this.streamContext = streamContext;
}
protected void constructStreams() throws IOException {

View File

@ -29,10 +29,11 @@ public class PushBackStream extends TupleStream {
private TupleStream stream;
private Tuple tuple;
/*
* A TupleStream that allows a single Tuple to be pushed back into Stream after it's been read.
*
**/
/**
* A TupleStream that allows a single Tuple to be pushed back into Stream after it's been read.
* This is a useful class when building streams that maintain the order of the tuples from two or
* more substreams.
**/
public PushBackStream(TupleStream stream) {
this.stream = stream;

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.HashMap;
/**
* Iterates over a TupleStream and buffers Tuples that are equal based on a field comparator.
* This allows tuples to be grouped by a common field.
*
* The read() method emits one tuple per group. The top levels fields reflect the first tuple
* encountered in the group.
*
* Use the Tuple.getMaps() method to return the all the Tuples in the group. The method returns
* a list of maps (including the group head), which hold the data for each Tuple in the group.
*
* Note: This ReducerStream requires that the underlying streams be sorted and partitioned by same
* fields as it's comparator.
*
**/
public class ReducerStream extends TupleStream {
private static final long serialVersionUID = 1;
private PushBackStream tupleStream;
private Comparator<Tuple> comp;
private Tuple currentGroupHead;
public ReducerStream(TupleStream tupleStream,
Comparator<Tuple> comp) {
this.tupleStream = new PushBackStream(tupleStream);
this.comp = comp;
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(tupleStream);
return l;
}
public void open() throws IOException {
tupleStream.open();
}
public void close() throws IOException {
tupleStream.close();
}
public Tuple read() throws IOException {
List<Map> maps = new ArrayList();
while(true) {
Tuple t = tupleStream.read();
if(t.EOF) {
if(maps.size() > 0) {
tupleStream.pushBack(t);
Map map1 = maps.get(0);
Map map2 = new HashMap();
map2.putAll(map1);
Tuple groupHead = new Tuple(map2);
groupHead.setMaps(maps);
return groupHead;
} else {
return t;
}
}
if(currentGroupHead == null) {
currentGroupHead = t;
maps.add(t.getMap());
} else {
if(comp.compare(currentGroupHead, t) == 0) {
maps.add(t.getMap());
} else {
Tuple groupHead = currentGroupHead.clone();
tupleStream.pushBack(t);
currentGroupHead = null;
groupHead.setMaps(maps);
return groupHead;
}
}
}
}
public int getCost() {
return 0;
}
}

View File

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
public class RollupStream extends TupleStream {
private static final long serialVersionUID = 1;
private PushBackStream tupleStream;
private Bucket[] buckets;
private Metric[] metrics;
private HashKey currentKey = new HashKey("-");
private Metric[] currentMetrics;
private boolean finished = false;
public RollupStream(TupleStream tupleStream,
Bucket[] buckets,
Metric[] metrics) {
this.tupleStream = new PushBackStream(tupleStream);
this.buckets = buckets;
this.metrics = metrics;
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(tupleStream);
return l;
}
public void open() throws IOException {
tupleStream.open();
}
public void close() throws IOException {
tupleStream.close();
}
public Tuple read() throws IOException {
while(true) {
Tuple tuple = tupleStream.read();
if(tuple.EOF) {
if(!finished) {
Map map = new HashMap();
if(currentMetrics != null) {
List<Double> metricValues = new ArrayList();
List<String> metricNames = new ArrayList();
for (Metric metric : currentMetrics) {
metricNames.add(metric.getName());
metricValues.add(metric.getValue());
}
map.put("buckets", currentKey.toString());
map.put("metricNames", metricNames);
map.put("metricValues", metricValues);
Tuple t = new Tuple(map);
tupleStream.pushBack(tuple);
finished = true;
return t;
} else {
return tuple;
}
} else {
return tuple;
}
}
String[] bucketValues = new String[buckets.length];
for(int i=0; i<buckets.length; i++) {
bucketValues[i] = buckets[i].getBucketValue(tuple);
}
HashKey hashKey = new HashKey(bucketValues);
if(hashKey.equals(currentKey)) {
for(Metric bucketMetric : currentMetrics) {
bucketMetric.update(tuple);
}
} else {
Tuple t = null;
if(currentMetrics != null) {
Map map = new HashMap();
List<Double> metricValues = new ArrayList();
List<String> metricNames = new ArrayList();
for(Metric metric : currentMetrics) {
metricNames.add(metric.getName());
metricValues.add(metric.getValue());
}
map.put("buckets", currentKey.toString());
map.put("metricNames", metricNames);
map.put("metricValues", metricValues);
t = new Tuple(map);
}
currentMetrics = new Metric[metrics.length];
currentKey = hashKey;
for(int i=0; i<metrics.length; i++) {
Metric bucketMetric = metrics[i].newInstance();
bucketMetric.update(tuple);
currentMetrics[i] = bucketMetric;
}
if(t != null) {
return t;
}
}
}
}
public int getCost() {
return 0;
}
}

View File

@ -28,6 +28,11 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The SolrClientCache caches SolrClients that they can be reused by different TupleStreams.
**/
public class SolrClientCache implements Serializable {
protected static final Logger log = LoggerFactory.getLogger(SolrClientCache.class);

View File

@ -40,6 +40,7 @@ public class SolrStream extends TupleStream {
private Map params;
private int numWorkers;
private int workerID;
private boolean trace;
private Map<String, String> fieldMappings;
private transient JSONTupleStream jsonTupleStream;
private transient HttpSolrClient client;
@ -65,7 +66,7 @@ public class SolrStream extends TupleStream {
public void setStreamContext(StreamContext context) {
this.numWorkers = context.numWorkers;
this.workerID = context.workerID;
this.cache = context.clientCache;
this.cache = context.getSolrClientCache();
}
/**
@ -87,11 +88,21 @@ public class SolrStream extends TupleStream {
}
}
private SolrParams loadParams(Map params) {
public void setTrace(boolean trace) {
this.trace = trace;
}
private SolrParams loadParams(Map params) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
if(this.numWorkers > 0) {
String partitionFilter = getPartitionFilter();
solrParams.add("fq", partitionFilter);
if(params.containsKey("partitionKeys")) {
if(!params.get("partitionKeys").equals("none")) {
String partitionFilter = getPartitionFilter();
solrParams.add("fq", partitionFilter);
}
} else {
if(numWorkers > 1) {
throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
}
}
Iterator<Map.Entry> it = params.entrySet().iterator();
@ -129,6 +140,11 @@ public class SolrStream extends TupleStream {
public Tuple read() throws IOException {
Map fields = jsonTupleStream.next();
if(trace) {
fields.put("_CORE_", this.baseUrl);
}
if(fields == null) {
//Return the EOF tuple.
Map m = new HashMap();

View File

@ -17,24 +17,26 @@
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
public class StreamContext {
/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
* The StreamContext is used pass shared context info from to concentrically wrapped TupleStreams.
*
* Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for reuse
* across multiple TupleStreams.
**/
public class StreamContext implements Serializable{
private Map entries = new HashMap();
public int workerID;
public int numWorkers;
public SolrClientCache clientCache;
public SolrClientCache getClientCache() {
return this.clientCache;
}
public void setSolrClientCache(SolrClientCache clientCache) {
this.clientCache = clientCache;
}
private SolrClientCache clientCache;
public Object get(Object key) {
return entries.get(key);
@ -43,4 +45,12 @@ public class StreamContext {
public void put(Object key, Object value) {
this.entries.put(key, value);
}
public void setSolrClientCache(SolrClientCache clientCache) {
this.clientCache = clientCache;
}
public SolrClientCache getSolrClientCache() {
return this.clientCache;
}
}

View File

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;
import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;
public class SumMetric implements Metric, Serializable {
private static final long serialVersionUID = 1;
public static final String SUM = "sum";
private String column;
private boolean isDouble;
private double doubleSum;
private long longSum;
public SumMetric(String column, boolean isDouble) {
this.column = column;
this.isDouble = isDouble;
}
public String getName() {
return "sum:"+column;
}
public void update(Tuple tuple) {
if(isDouble) {
Double d = (Double)tuple.get(column);
doubleSum += d.doubleValue();
} else {
Long l = (Long)tuple.get(column);
longSum += l.doubleValue();
}
}
public Metric newInstance() {
return new SumMetric(column, isDouble);
}
public Map<String, Double> metricValues() {
Map m = new HashMap();
if(isDouble) {
m.put(SUM,doubleSum);
} else {
doubleSum = (double)longSum;
m.put(SUM,doubleSum);
}
return m;
}
public double getValue() {
if(isDouble) {
return doubleSum;
} else {
return (double)longSum;
}
}
public void update(Map<String, Double> metricValues) {
if(isDouble) {
double dsum = metricValues.get(SUM);
doubleSum+=dsum;
} else {
double dsum = metricValues.get(SUM);
longSum+=(long)dsum;
}
}
}

View File

@ -47,7 +47,7 @@ public class Tuple implements Cloneable {
return this.fields.get(key);
}
public void set(Object key, Object value) {
public void put(Object key, Object value) {
this.fields.put(key, value);
}
@ -63,19 +63,14 @@ public class Tuple implements Cloneable {
return (Double)this.fields.get(key);
}
public List<String> getStrings(Object key) {
return (List<String>)this.fields.get(key);
}
public List<Long> getLongs(Object key) {
return (List<Long>)this.fields.get(key);
}
public List<Double> getDoubles(Object key) {
return (List<Double>)this.fields.get(key);
}
@ -84,6 +79,20 @@ public class Tuple implements Cloneable {
return fields.entrySet().iterator();
}
public Map getMap() {
return this.fields;
}
public List<Map> getMaps() {
return (List<Map>)this.fields.get("_MAPS_");
}
public void setMaps(List<Map> maps) {
this.fields.put("_MAPS_", maps);
}
public Tuple clone() {
HashMap m = new HashMap();
m.putAll(fields);

View File

@ -22,6 +22,13 @@ import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;
/**
* The UniqueStream emits a unique stream of Tuples based on a Comparator.
*
* Note: The sort order of the underlying stream must match the Comparator.
**/
public class UniqueStream extends TupleStream {
private static final long serialVersionUID = 1;

View File

@ -1,5 +1,4 @@
package org.apache.solr.client.solrj.io;
package org.apache.solr.client.solrj.io;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -17,23 +16,46 @@ package org.apache.solr.client.solrj.io;
* limitations under the License.
*/
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.List;
public class DescMetricComp implements Comparator<Tuple>, Serializable {
public class CountStream extends TupleStream implements Serializable {
private static final long serialVersionUID = 1;
private TupleStream stream;
private int count;
private int ord;
public DescMetricComp(int ord) {
this.ord = ord;
public CountStream(TupleStream stream) {
this.stream = stream;
}
public int compare(Tuple t1, Tuple t2) {
List<Double> values1 = (List<Double>)t1.get("metricValues");
List<Double> values2 = (List<Double>)t2.get("metricValues");
return values1.get(ord).compareTo(values2.get(ord))*-1;
public void close() throws IOException {
this.stream.close();
}
public void open() throws IOException {
this.stream.open();
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(stream);
return l;
}
public void setStreamContext(StreamContext streamContext) {
stream.setStreamContext(streamContext);
}
public Tuple read() throws IOException {
Tuple t = stream.read();
if(t.EOF) {
t.put("count", count);
return t;
} else {
++count;
return t;
}
}
}