SOLR-10770: Add date formatting to timeseries Streaming Expression

This commit is contained in:
Joel Bernstein 2017-05-30 14:38:50 -04:00
parent d14ca98df8
commit 520762913a
2 changed files with 114 additions and 3 deletions

View File

@ -17,6 +17,10 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -53,6 +57,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
private String end;
private String gap;
private String field;
private DateTimeFormatter formatter;
private SimpleDateFormat ISOFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
private Metric[] metrics;
private List<Tuple> tuples = new ArrayList();
@ -70,8 +76,9 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
String field,
String start,
String end,
String gap) throws IOException {
init(collection, params, field, metrics, start, end, gap, zkHost);
String gap,
String format) throws IOException {
init(collection, params, field, metrics, start, end, gap, format, zkHost);
}
public TimeSeriesStream(StreamExpression expression, StreamFactory factory) throws IOException{
@ -82,9 +89,17 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
StreamExpressionNamedParameter endExpression = factory.getNamedOperand(expression, "end");
StreamExpressionNamedParameter fieldExpression = factory.getNamedOperand(expression, "field");
StreamExpressionNamedParameter gapExpression = factory.getNamedOperand(expression, "gap");
StreamExpressionNamedParameter formatExpression = factory.getNamedOperand(expression, "format");
StreamExpressionNamedParameter qExpression = factory.getNamedOperand(expression, "q");
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
if(qExpression == null) {
throw new IOException("The timeseries expression requires the q parameter");
}
String start = null;
if(startExpression != null) {
start = ((StreamExpressionValue)startExpression.getParameter()).getValue();
@ -105,6 +120,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
field = ((StreamExpressionValue)fieldExpression.getParameter()).getValue();
}
String format = null;
if(formatExpression != null) {
format = ((StreamExpressionValue)formatExpression.getParameter()).getValue();
}
// Collection Name
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
@ -149,7 +169,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
}
// We've got all the required items
init(collectionName, params, field, metrics, start, end, gap , zkHost);
init(collectionName, params, field, metrics, start, end, gap, format, zkHost);
}
public String getCollection() {
@ -163,6 +183,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
String start,
String end,
String gap,
String format,
String zkHost) throws IOException {
this.zkHost = zkHost;
this.collection = collection;
@ -175,6 +196,9 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
this.field = field;
this.params = params;
this.end = end;
if(format != null) {
formatter = DateTimeFormatter.ofPattern(format);
}
}
@Override
@ -348,6 +372,12 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
for(int b=0; b<allBuckets.size(); b++) {
NamedList bucket = (NamedList)allBuckets.get(b);
Object val = bucket.get("val");
if(formatter != null) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(((java.util.Date) val).toInstant(), ZoneOffset.UTC);
val = localDateTime.format(formatter);
}
Tuple t = currentTuple.clone();
t.put(field, val);
int m = 0;

View File

@ -5368,6 +5368,87 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D));
assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D));
expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " +
"end=\"2016-12-01T01:00:00.000Z\", " +
"gap=\"+1YEAR\", " +
"field=\"test_dt\", " +
"format=\"yyyy\", " +
"count(*), sum(price_f), max(price_f), min(price_f))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).get("test_dt").equals("2013"));
assertTrue(tuples.get(0).getLong("count(*)").equals(100L));
assertTrue(tuples.get(0).getDouble("sum(price_f)").equals(10000D));
assertTrue(tuples.get(0).getDouble("max(price_f)").equals(100D));
assertTrue(tuples.get(0).getDouble("min(price_f)").equals(100D));
assertTrue(tuples.get(1).get("test_dt").equals("2014"));
assertTrue(tuples.get(1).getLong("count(*)").equals(50L));
assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(25000D));
assertTrue(tuples.get(1).getDouble("max(price_f)").equals(500D));
assertTrue(tuples.get(1).getDouble("min(price_f)").equals(500D));
assertTrue(tuples.get(2).get("test_dt").equals("2015"));
assertTrue(tuples.get(2).getLong("count(*)").equals(50L));
assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(15000D));
assertTrue(tuples.get(2).getDouble("max(price_f)").equals(300D));
assertTrue(tuples.get(2).getDouble("min(price_f)").equals(300D));
assertTrue(tuples.get(3).get("test_dt").equals("2016"));
assertTrue(tuples.get(3).getLong("count(*)").equals(50L));
assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(20000D));
assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D));
assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D));
expr = "timeseries("+COLLECTIONORALIAS+", q=\"*:*\", start=\"2013-01-01T01:00:00.000Z\", " +
"end=\"2016-12-01T01:00:00.000Z\", " +
"gap=\"+1YEAR\", " +
"field=\"test_dt\", " +
"format=\"yyyy-MM\", " +
"count(*), sum(price_f), max(price_f), min(price_f))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).get("test_dt").equals("2013-01"));
assertTrue(tuples.get(0).getLong("count(*)").equals(100L));
assertTrue(tuples.get(0).getDouble("sum(price_f)").equals(10000D));
assertTrue(tuples.get(0).getDouble("max(price_f)").equals(100D));
assertTrue(tuples.get(0).getDouble("min(price_f)").equals(100D));
assertTrue(tuples.get(1).get("test_dt").equals("2014-01"));
assertTrue(tuples.get(1).getLong("count(*)").equals(50L));
assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(25000D));
assertTrue(tuples.get(1).getDouble("max(price_f)").equals(500D));
assertTrue(tuples.get(1).getDouble("min(price_f)").equals(500D));
assertTrue(tuples.get(2).get("test_dt").equals("2015-01"));
assertTrue(tuples.get(2).getLong("count(*)").equals(50L));
assertTrue(tuples.get(2).getDouble("sum(price_f)").equals(15000D));
assertTrue(tuples.get(2).getDouble("max(price_f)").equals(300D));
assertTrue(tuples.get(2).getDouble("min(price_f)").equals(300D));
assertTrue(tuples.get(3).get("test_dt").equals("2016-01"));
assertTrue(tuples.get(3).getLong("count(*)").equals(50L));
assertTrue(tuples.get(3).getDouble("sum(price_f)").equals(20000D));
assertTrue(tuples.get(3).getDouble("max(price_f)").equals(400D));
assertTrue(tuples.get(3).getDouble("min(price_f)").equals(400D));
}
@Test