Aggregations: Fix infinite loop in the histogram reduce logic.
The histogram reduce method can run into an infinite loop if the Rounding.nextRoundingValue value is buggy, which happened to be the case for DayTimeZoneRoundingFloor. DayTimeZoneRoundingFloor is fixed, and the histogram reduce method has been changed to fail instead of running into an infinite loop in case of a buffy nextRoundingValue impl. Close #6965
This commit is contained in:
parent
99b32901d2
commit
a9d5c03924
|
@ -23,7 +23,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.joda.time.DateTimeConstants;
|
import org.joda.time.DateTimeConstants;
|
||||||
|
import org.joda.time.DateTimeField;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
|
import org.joda.time.DurationField;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -128,6 +130,8 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
static final byte ID = 1;
|
static final byte ID = 1;
|
||||||
|
|
||||||
private DateTimeUnit unit;
|
private DateTimeUnit unit;
|
||||||
|
private DateTimeField field;
|
||||||
|
private DurationField durationField;
|
||||||
private DateTimeZone preTz;
|
private DateTimeZone preTz;
|
||||||
private DateTimeZone postTz;
|
private DateTimeZone postTz;
|
||||||
|
|
||||||
|
@ -136,6 +140,8 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
TimeTimeZoneRoundingFloor(DateTimeUnit unit, DateTimeZone preTz, DateTimeZone postTz) {
|
TimeTimeZoneRoundingFloor(DateTimeUnit unit, DateTimeZone preTz, DateTimeZone postTz) {
|
||||||
this.unit = unit;
|
this.unit = unit;
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
this.preTz = preTz;
|
this.preTz = preTz;
|
||||||
this.postTz = postTz;
|
this.postTz = postTz;
|
||||||
}
|
}
|
||||||
|
@ -148,7 +154,7 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
@Override
|
@Override
|
||||||
public long roundKey(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
return unit.field().roundFloor(time);
|
return field.roundFloor(time);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -162,12 +168,14 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long nextRoundingValue(long value) {
|
public long nextRoundingValue(long value) {
|
||||||
return unit.field().roundCeiling(value + 1);
|
return durationField.add(value, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
unit = DateTimeUnit.resolve(in.readByte());
|
unit = DateTimeUnit.resolve(in.readByte());
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
preTz = DateTimeZone.forID(in.readSharedString());
|
preTz = DateTimeZone.forID(in.readSharedString());
|
||||||
postTz = DateTimeZone.forID(in.readSharedString());
|
postTz = DateTimeZone.forID(in.readSharedString());
|
||||||
}
|
}
|
||||||
|
@ -185,12 +193,16 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
final static byte ID = 2;
|
final static byte ID = 2;
|
||||||
|
|
||||||
private DateTimeUnit unit;
|
private DateTimeUnit unit;
|
||||||
|
private DateTimeField field;
|
||||||
|
private DurationField durationField;
|
||||||
|
|
||||||
UTCTimeZoneRoundingFloor() { // for serialization
|
UTCTimeZoneRoundingFloor() { // for serialization
|
||||||
}
|
}
|
||||||
|
|
||||||
UTCTimeZoneRoundingFloor(DateTimeUnit unit) {
|
UTCTimeZoneRoundingFloor(DateTimeUnit unit) {
|
||||||
this.unit = unit;
|
this.unit = unit;
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -200,7 +212,7 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long roundKey(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
return unit.field().roundFloor(utcMillis);
|
return field.roundFloor(utcMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -210,12 +222,14 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long nextRoundingValue(long value) {
|
public long nextRoundingValue(long value) {
|
||||||
return unit.field().roundCeiling(value + 1);
|
return durationField.add(value, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
unit = DateTimeUnit.resolve(in.readByte());
|
unit = DateTimeUnit.resolve(in.readByte());
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -229,6 +243,8 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
final static byte ID = 3;
|
final static byte ID = 3;
|
||||||
|
|
||||||
private DateTimeUnit unit;
|
private DateTimeUnit unit;
|
||||||
|
private DateTimeField field;
|
||||||
|
private DurationField durationField;
|
||||||
private DateTimeZone preTz;
|
private DateTimeZone preTz;
|
||||||
private DateTimeZone postTz;
|
private DateTimeZone postTz;
|
||||||
|
|
||||||
|
@ -237,6 +253,8 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
DayTimeZoneRoundingFloor(DateTimeUnit unit, DateTimeZone preTz, DateTimeZone postTz) {
|
DayTimeZoneRoundingFloor(DateTimeUnit unit, DateTimeZone preTz, DateTimeZone postTz) {
|
||||||
this.unit = unit;
|
this.unit = unit;
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
this.preTz = preTz;
|
this.preTz = preTz;
|
||||||
this.postTz = postTz;
|
this.postTz = postTz;
|
||||||
}
|
}
|
||||||
|
@ -249,7 +267,7 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
@Override
|
@Override
|
||||||
public long roundKey(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
return unit.field().roundFloor(time);
|
return field.roundFloor(time);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -262,12 +280,14 @@ public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long nextRoundingValue(long value) {
|
public long nextRoundingValue(long value) {
|
||||||
return unit.field().getDurationField().getUnitMillis() + value;
|
return durationField.add(value, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
unit = DateTimeUnit.resolve(in.readByte());
|
unit = DateTimeUnit.resolve(in.readByte());
|
||||||
|
field = unit.field();
|
||||||
|
durationField = field.getDurationField();
|
||||||
preTz = DateTimeZone.forID(in.readSharedString());
|
preTz = DateTimeZone.forID(in.readSharedString());
|
||||||
postTz = DateTimeZone.forID(in.readSharedString());
|
postTz = DateTimeZone.forID(in.readSharedString());
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,10 +320,11 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
||||||
B nextBucket = list.get(iter.nextIndex());
|
B nextBucket = list.get(iter.nextIndex());
|
||||||
if (lastBucket != null) {
|
if (lastBucket != null) {
|
||||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||||
while (key != nextBucket.key) {
|
while (key < nextBucket.key) {
|
||||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
||||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||||
}
|
}
|
||||||
|
assert key == nextBucket.key;
|
||||||
}
|
}
|
||||||
lastBucket = iter.next();
|
lastBucket = iter.next();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1299,4 +1299,36 @@ public class DateHistogramTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(bucket, Matchers.notNullValue());
|
assertThat(bucket, Matchers.notNullValue());
|
||||||
assertThat(bucket.getDocCount(), equalTo(5l));
|
assertThat(bucket.getDocCount(), equalTo(5l));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIssue6965() {
|
||||||
|
SearchResponse response = client().prepareSearch("idx")
|
||||||
|
.addAggregation(dateHistogram("histo").field("date").preZone("+01:00").interval(DateHistogram.Interval.MONTH).minDocCount(0))
|
||||||
|
.execute().actionGet();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
|
||||||
|
DateHistogram histo = response.getAggregations().get("histo");
|
||||||
|
assertThat(histo, notNullValue());
|
||||||
|
assertThat(histo.getName(), equalTo("histo"));
|
||||||
|
assertThat(histo.getBuckets().size(), equalTo(3));
|
||||||
|
|
||||||
|
DateTime key = new DateTime(2012, 1, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
DateHistogram.Bucket bucket = getBucket(histo, key);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKeyAsNumber().longValue(), equalTo(key.getMillis()));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||||
|
|
||||||
|
key = new DateTime(2012, 2, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
bucket = getBucket(histo, key);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKeyAsNumber().longValue(), equalTo(key.getMillis()));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(2l));
|
||||||
|
|
||||||
|
key = new DateTime(2012, 3, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
bucket = getBucket(histo, key);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKeyAsNumber().longValue(), equalTo(key.getMillis()));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(3l));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue