diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 2d3aebb320d..2e30ff474f2 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -120,6 +120,7 @@ public abstract class StreamInput extends InputStream { * only if you must differentiate null from empty. Use {@link StreamInput#readBytesReference()} and * {@link StreamOutput#writeBytesReference(BytesReference)} if you do not. */ + @Nullable public BytesReference readOptionalBytesReference() throws IOException { int length = readVInt() - 1; if (length < 0) { @@ -275,6 +276,14 @@ public abstract class StreamInput extends InputStream { return BitUtil.zigZagDecode(accumulator | (currentByte << i)); } + @Nullable + public Long readOptionalLong() throws IOException { + if (readBoolean()) { + return readLong(); + } + return null; + } + @Nullable public Text readOptionalText() throws IOException { int length = readInt(); @@ -355,6 +364,7 @@ public abstract class StreamInput extends InputStream { return Double.longBitsToDouble(readLong()); } + @Nullable public final Double readOptionalDouble() throws IOException { if (readBoolean()) { return readDouble(); @@ -402,6 +412,7 @@ public abstract class StreamInput extends InputStream { return ret; } + @Nullable public String[] readOptionalStringArray() throws IOException { if (readBoolean()) { return readStringArray(); @@ -635,6 +646,7 @@ public abstract class StreamInput extends InputStream { /** * Serializes a potential null value. */ + @Nullable public T readOptionalStreamable(Supplier supplier) throws IOException { if (readBoolean()) { T streamable = supplier.get(); @@ -645,6 +657,7 @@ public abstract class StreamInput extends InputStream { } } + @Nullable public T readOptionalWriteable(Writeable.Reader reader) throws IOException { if (readBoolean()) { T t = reader.read(this); @@ -769,6 +782,7 @@ public abstract class StreamInput extends InputStream { /** * Reads an optional {@link NamedWriteable}. */ + @Nullable public C readOptionalNamedWriteable(Class categoryClass) throws IOException { if (readBoolean()) { return readNamedWriteable(categoryClass); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 64ed6ed3904..3ee6e94a4af 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -237,6 +237,15 @@ public abstract class StreamOutput extends OutputStream { writeByte((byte) (value & 0x7F)); } + public void writeOptionalLong(@Nullable Long l) throws IOException { + if (l == null) { + writeBoolean(false); + } else { + writeBoolean(true); + writeLong(l); + } + } + public void writeOptionalString(@Nullable String str) throws IOException { if (str == null) { writeBoolean(false); @@ -314,7 +323,7 @@ public abstract class StreamOutput extends OutputStream { writeLong(Double.doubleToLongBits(v)); } - public void writeOptionalDouble(Double v) throws IOException { + public void writeOptionalDouble(@Nullable Double v) throws IOException { if (v == null) { writeBoolean(false); } else { @@ -798,7 +807,7 @@ public abstract class StreamOutput extends OutputStream { /** * Write an optional {@linkplain DateTimeZone} to the stream. */ - public void writeOptionalTimeZone(DateTimeZone timeZone) throws IOException { + public void writeOptionalTimeZone(@Nullable DateTimeZone timeZone) throws IOException { if (timeZone == null) { writeBoolean(false); } else { diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/ObjectParser.java b/core/src/main/java/org/elasticsearch/common/xcontent/ObjectParser.java index 3e595ae0d18..44d9e6e1993 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/ObjectParser.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/ObjectParser.java @@ -416,6 +416,7 @@ public final class ObjectParser PARSER = new ConstructingObjectParser<>( + "extended_bounds", a -> { + assert a.length == 2; + Long min = null; + Long max = null; + String minAsStr = null; + String maxAsStr = null; + if (a[0] == null) { + // nothing to do with it + } else if (a[0] instanceof Long) { + min = (Long) a[0]; + } else if (a[0] instanceof String) { + minAsStr = (String) a[0]; + } else { + throw new IllegalArgumentException("Unknown field type [" + a[0].getClass() + "]"); + } + if (a[1] == null) { + // nothing to do with it + } else if (a[1] instanceof Long) { + max = (Long) a[1]; + } else if (a[1] instanceof String) { + maxAsStr = (String) a[1]; + } else { + throw new IllegalArgumentException("Unknown field type [" + a[1].getClass() + "]"); + } + return new ExtendedBounds(min, max, minAsStr, maxAsStr); + }); + static { + NoContextParser longOrString = p -> { + if (p.currentToken() == Token.VALUE_NUMBER) { + return p.longValue(false); + } + if (p.currentToken() == Token.VALUE_STRING) { + return p.text(); + } + if (p.currentToken() == Token.VALUE_NULL) { + return null; + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }; + PARSER.declareField(optionalConstructorArg(), longOrString, MIN_FIELD, ValueType.LONG_OR_NULL); + PARSER.declareField(optionalConstructorArg(), longOrString, MAX_FIELD, ValueType.LONG_OR_NULL); } + /** + * Parsed min value. If this is null and {@linkplain #minAsStr} isn't then this must be parsed from {@linkplain #minAsStr}. If this is + * null and {@linkplain #minAsStr} is also null then there is no lower bound. + */ + private final Long min; + /** + * Parsed min value. If this is null and {@linkplain #maxAsStr} isn't then this must be parsed from {@linkplain #maxAsStr}. If this is + * null and {@linkplain #maxAsStr} is also null then there is no lower bound. + */ + private final Long max; + + private final String minAsStr; + private final String maxAsStr; + + /** + * Construct with parsed bounds. + */ + public ExtendedBounds(Long min, Long max) { + this(min, max, null, null); + } + + /** + * Construct with unparsed bounds. + */ public ExtendedBounds(String minAsStr, String maxAsStr) { + this(null, null, minAsStr, maxAsStr); + } + + /** + * Construct with all possible information. + */ + private ExtendedBounds(Long min, Long max, String minAsStr, String maxAsStr) { + this.min = min; + this.max = max; this.minAsStr = minAsStr; this.maxAsStr = maxAsStr; } @@ -64,85 +131,45 @@ public class ExtendedBounds implements ToXContent, Writeable { * Read from a stream. */ public ExtendedBounds(StreamInput in) throws IOException { - if (in.readBoolean()) { - min = in.readLong(); - } - if (in.readBoolean()) { - max = in.readLong(); - } + min = in.readOptionalLong(); + max = in.readOptionalLong(); minAsStr = in.readOptionalString(); maxAsStr = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { - if (min != null) { - out.writeBoolean(true); - out.writeLong(min); - } else { - out.writeBoolean(false); - } - if (max != null) { - out.writeBoolean(true); - out.writeLong(max); - } else { - out.writeBoolean(false); - } + out.writeOptionalLong(min); + out.writeOptionalLong(max); out.writeOptionalString(minAsStr); out.writeOptionalString(maxAsStr); } - - void processAndValidate(String aggName, SearchContext context, DocValueFormat format) { + /** + * Parse the bounds and perform any delayed validation. Returns the result of the parsing. + */ + ExtendedBounds parseAndValidate(String aggName, SearchContext context, DocValueFormat format) { + Long min = this.min; + Long max = this.max; assert format != null; if (minAsStr != null) { - min = format.parseLong(minAsStr, false, context.nowCallable()); + min = format.parseLong(minAsStr, false, context::nowInMillis); } if (maxAsStr != null) { // TODO: Should we rather pass roundUp=true? - max = format.parseLong(maxAsStr, false, context.nowCallable()); + max = format.parseLong(maxAsStr, false, context::nowInMillis); } if (min != null && max != null && min.compareTo(max) > 0) { throw new SearchParseException(context, "[extended_bounds.min][" + min + "] cannot be greater than " + "[extended_bounds.max][" + max + "] for histogram aggregation [" + aggName + "]", null); } + return new ExtendedBounds(min, max, minAsStr, maxAsStr); } ExtendedBounds round(Rounding rounding) { return new ExtendedBounds(min != null ? rounding.round(min) : null, max != null ? rounding.round(max) : null); } - public static ExtendedBounds fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher, String aggregationName) - throws IOException { - XContentParser.Token token = null; - String currentFieldName = null; - ExtendedBounds extendedBounds = new ExtendedBounds(); - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - if ("min".equals(currentFieldName)) { - extendedBounds.minAsStr = parser.text(); - } else if ("max".equals(currentFieldName)) { - extendedBounds.maxAsStr = parser.text(); - } else { - throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token - + " in aggregation [" + aggregationName + "]: [" + currentFieldName + "]."); - } - } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (parseFieldMatcher.match(currentFieldName, MIN_FIELD)) { - extendedBounds.min = parser.longValue(true); - } else if (parseFieldMatcher.match(currentFieldName, MAX_FIELD)) { - extendedBounds.max = parser.longValue(true); - } else { - throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token - + " in aggregation [" + aggregationName + "]: [" + currentFieldName + "]."); - } - } - } - return extendedBounds; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(EXTENDED_BOUNDS_FIELD.getPreferredName()); @@ -162,7 +189,7 @@ public class ExtendedBounds implements ToXContent, Writeable { @Override public int hashCode() { - return Objects.hash(min, max); + return Objects.hash(min, max, minAsStr, maxAsStr); } @Override @@ -175,6 +202,43 @@ public class ExtendedBounds implements ToXContent, Writeable { } ExtendedBounds other = (ExtendedBounds) obj; return Objects.equals(min, other.min) - && Objects.equals(min, other.min); + && Objects.equals(max, other.max) + && Objects.equals(minAsStr, other.minAsStr) + && Objects.equals(maxAsStr, other.maxAsStr); + } + + public Long getMin() { + return min; + } + + public Long getMax() { + return max; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + if (min != null) { + b.append(min); + if (minAsStr != null) { + b.append('(').append(minAsStr).append(')'); + } + } else { + if (minAsStr != null) { + b.append(minAsStr); + } + } + b.append("--"); + if (max != null) { + b.append(min); + if (maxAsStr != null) { + b.append('(').append(maxAsStr).append(')'); + } + } else { + if (maxAsStr != null) { + b.append(maxAsStr); + } + } + return b.toString(); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java index 371dfad07d7..6897fd2e7f3 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java @@ -127,8 +127,11 @@ public class HistogramParser extends NumericValuesSourceParser { otherOptions.put(HistogramAggregator.ORDER_FIELD, order); return true; } else if (parseFieldMatcher.match(currentFieldName, ExtendedBounds.EXTENDED_BOUNDS_FIELD)) { - ExtendedBounds extendedBounds = ExtendedBounds.fromXContent(parser, parseFieldMatcher, aggregationName); - otherOptions.put(ExtendedBounds.EXTENDED_BOUNDS_FIELD, extendedBounds); + try { + otherOptions.put(ExtendedBounds.EXTENDED_BOUNDS_FIELD, ExtendedBounds.PARSER.apply(parser, () -> parseFieldMatcher)); + } catch (Exception e) { + throw new ParsingException(parser.getTokenLocation(), "Error parsing [{}]", e, aggregationName); + } return true; } else { return false; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 9bf2f10576d..8891f75d4bd 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -411,9 +411,9 @@ public class InternalHistogram extends Inter if (bounds != null) { B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; if (firstBucket == null) { - if (bounds.min != null && bounds.max != null) { - long key = bounds.min; - long max = bounds.max; + if (bounds.getMin() != null && bounds.getMax() != null) { + long key = bounds.getMin(); + long max = bounds.getMax(); while (key <= max) { iter.add(getFactory().createBucket(key, 0, reducedEmptySubAggs, @@ -422,8 +422,8 @@ public class InternalHistogram extends Inter } } } else { - if (bounds.min != null) { - long key = bounds.min; + if (bounds.getMin() != null) { + long key = bounds.getMin(); if (key < firstBucket.key) { while (key < firstBucket.key) { iter.add(getFactory().createBucket(key, 0, @@ -454,9 +454,9 @@ public class InternalHistogram extends Inter } // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) - if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) { + if (bounds != null && lastBucket != null && bounds.getMax() != null && bounds.getMax() > lastBucket.key) { long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); - long max = bounds.max; + long max = bounds.getMax(); while (key <= max) { iter.add(getFactory().createBucket(key, 0, reducedEmptySubAggs, keyed, diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index 3aa5fdca17f..c83e2d2c721 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -119,10 +119,10 @@ public class RangeAggregator extends BucketsAggregator { Double from = this.from; Double to = this.to; if (fromAsStr != null) { - from = parser.parseDouble(fromAsStr, false, context.nowCallable()); + from = parser.parseDouble(fromAsStr, false, context::nowInMillis); } if (toAsStr != null) { - to = parser.parseDouble(toAsStr, false, context.nowCallable()); + to = parser.parseDouble(toAsStr, false, context::nowInMillis); } return new Range(key, from, fromAsStr, to, toAsStr); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/core/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index f956d311a68..79549f87392 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -97,7 +97,7 @@ public class AggregationContext { } else { if (config.fieldContext() != null && config.fieldContext().fieldType() != null) { missing = config.fieldContext().fieldType().docValueFormat(null, DateTimeZone.UTC) - .parseDouble(config.missing().toString(), false, context.nowCallable()); + .parseDouble(config.missing().toString(), false, context::nowInMillis); } else { missing = Double.parseDouble(config.missing().toString()); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java index b711748b7a8..3112212dc51 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -143,15 +143,6 @@ public abstract class SearchContext implements Releasable { return nowInMillisImpl(); } - public final Callable nowCallable() { - return new Callable() { - @Override - public Long call() throws Exception { - return nowInMillis(); - } - }; - }; - public final boolean nowInMillisUsed() { return nowInMillisUsed; } diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index f0cc617299a..846c8dca373 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -266,6 +266,7 @@ public class BytesStreamsTests extends ESTestCase { out.writeVInt(2); out.writeLong(-3); out.writeVLong(4); + out.writeOptionalLong(11234234L); out.writeFloat(1.1f); out.writeDouble(2.2); int[] intArray = {1, 2, 3}; @@ -299,8 +300,9 @@ public class BytesStreamsTests extends ESTestCase { assertThat(in.readShort(), equalTo((short)-1)); assertThat(in.readInt(), equalTo(-1)); assertThat(in.readVInt(), equalTo(2)); - assertThat(in.readLong(), equalTo((long)-3)); - assertThat(in.readVLong(), equalTo((long)4)); + assertThat(in.readLong(), equalTo(-3L)); + assertThat(in.readVLong(), equalTo(4L)); + assertThat(in.readOptionalLong(), equalTo(11234234L)); assertThat((double)in.readFloat(), closeTo(1.1, 0.0001)); assertThat(in.readDouble(), closeTo(2.2, 0.0001)); assertThat(in.readGenericValue(), equalTo((Object) intArray)); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java index 74ea18cc1d1..ab196632a20 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DateHistogramTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.BaseAggregationTestCase; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; +import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; public class DateHistogramTests extends BaseAggregationTestCase { @@ -62,9 +63,7 @@ public class DateHistogramTests extends BaseAggregationTestCase new ExtendedBounds(100L, 90L).parseAndValidate("test", context, format)); + assertEquals("[extended_bounds.min][100] cannot be greater than [extended_bounds.max][90] for histogram aggregation [test]", + e.getMessage()); + + e = expectThrows(SearchParseException.class, + () -> unparsed(new ExtendedBounds(100L, 90L)).parseAndValidate("test", context, format)); + assertEquals("[extended_bounds.min][100] cannot be greater than [extended_bounds.max][90] for histogram aggregation [test]", + e.getMessage()); + } + + public void testTransportRoundTrip() throws IOException { + ExtendedBounds orig = randomExtendedBounds(); + + BytesReference origBytes; + try (BytesStreamOutput out = new BytesStreamOutput()) { + orig.writeTo(out); + origBytes = out.bytes(); + } + + ExtendedBounds read; + try (StreamInput in = origBytes.streamInput()) { + read = new ExtendedBounds(in); + assertEquals("read fully", 0, in.available()); + } + assertEquals(orig, read); + + BytesReference readBytes; + try (BytesStreamOutput out = new BytesStreamOutput()) { + read.writeTo(out); + readBytes = out.bytes(); + } + + assertEquals(origBytes, readBytes); + } + + public void testXContentRoundTrip() throws Exception { + ExtendedBounds orig = randomExtendedBounds(); + + try (XContentBuilder out = JsonXContent.contentBuilder()) { + orig.toXContent(out, ToXContent.EMPTY_PARAMS); + try (XContentParser in = JsonXContent.jsonXContent.createParser(out.bytes())) { + in.nextToken(); + ExtendedBounds read = ExtendedBounds.PARSER.apply(in, () -> ParseFieldMatcher.STRICT); + assertEquals(orig, read); + } catch (Exception e) { + throw new Exception("Error parsing [" + out.bytes().utf8ToString() + "]", e); + } + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 37ba9b66242..db004ca0c3b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -664,8 +664,16 @@ public class ElasticsearchAssertions { newInstance.readFrom(input); assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0)); - assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", - serialize(version, streamable), equalTo(orig)); + BytesReference newBytes = serialize(version, streamable); + if (false == orig.equals(newBytes)) { + // The bytes are different. That is a failure. Lets try to throw a useful exception for debugging. + String message = "Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + + "]"; + // If the bytes are different then comparing BytesRef's toStrings will show you *where* they are different + assertEquals(message, orig.toBytesRef().toString(), newBytes.toBytesRef().toString()); + // They bytes aren't different. Very very weird. + fail(message); + } } catch (Exception ex) { throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex); }