EQL: Fix casing for tiebreaker field (#57943)
Use tiebreaker instead of tieBreaker (cherry picked from commit 3c774948a5d5e10fac267cb9a54f5d0559a00c1d)
This commit is contained in:
parent
0a2bd10758
commit
ff0ea62cb8
|
@ -43,11 +43,11 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
|
||||||
private int fetchSize = 50;
|
private int fetchSize = 50;
|
||||||
private SearchAfterBuilder searchAfterBuilder;
|
private SearchAfterBuilder searchAfterBuilder;
|
||||||
private String query;
|
private String query;
|
||||||
private String tieBreakerField;
|
private String tiebreakerField;
|
||||||
|
|
||||||
static final String KEY_FILTER = "filter";
|
static final String KEY_FILTER = "filter";
|
||||||
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
|
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
|
||||||
static final String KEY_TIE_BREAKER_FIELD = "tie_breaker_field";
|
static final String KEY_TIEBREAKER_FIELD = "tiebreaker_field";
|
||||||
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
|
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
|
||||||
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
|
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
|
||||||
static final String KEY_SIZE = "size";
|
static final String KEY_SIZE = "size";
|
||||||
|
@ -66,8 +66,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
|
||||||
builder.field(KEY_FILTER, filter);
|
builder.field(KEY_FILTER, filter);
|
||||||
}
|
}
|
||||||
builder.field(KEY_TIMESTAMP_FIELD, timestampField());
|
builder.field(KEY_TIMESTAMP_FIELD, timestampField());
|
||||||
if (tieBreakerField != null) {
|
if (tiebreakerField != null) {
|
||||||
builder.field(KEY_TIE_BREAKER_FIELD, tieBreakerField());
|
builder.field(KEY_TIEBREAKER_FIELD, tiebreakerField());
|
||||||
}
|
}
|
||||||
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
|
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
|
||||||
if (implicitJoinKeyField != null) {
|
if (implicitJoinKeyField != null) {
|
||||||
|
@ -112,13 +112,13 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String tieBreakerField() {
|
public String tiebreakerField() {
|
||||||
return this.tieBreakerField;
|
return this.tiebreakerField;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EqlSearchRequest tieBreakerField(String tieBreakerField) {
|
public EqlSearchRequest tiebreakerField(String tiebreakerField) {
|
||||||
Objects.requireNonNull(tieBreakerField, "tie breaker field must not be null");
|
Objects.requireNonNull(tiebreakerField, "tiebreaker field must not be null");
|
||||||
this.tieBreakerField = tieBreakerField;
|
this.tiebreakerField = tiebreakerField;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
|
||||||
Objects.equals(indicesOptions, that.indicesOptions) &&
|
Objects.equals(indicesOptions, that.indicesOptions) &&
|
||||||
Objects.equals(filter, that.filter) &&
|
Objects.equals(filter, that.filter) &&
|
||||||
Objects.equals(timestampField, that.timestampField) &&
|
Objects.equals(timestampField, that.timestampField) &&
|
||||||
Objects.equals(tieBreakerField, that.tieBreakerField) &&
|
Objects.equals(tiebreakerField, that.tiebreakerField) &&
|
||||||
Objects.equals(eventCategoryField, that.eventCategoryField) &&
|
Objects.equals(eventCategoryField, that.eventCategoryField) &&
|
||||||
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
|
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
|
||||||
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
|
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
|
||||||
|
@ -210,7 +210,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
|
||||||
filter,
|
filter,
|
||||||
fetchSize,
|
fetchSize,
|
||||||
timestampField,
|
timestampField,
|
||||||
tieBreakerField,
|
tiebreakerField,
|
||||||
eventCategoryField,
|
eventCategoryField,
|
||||||
implicitJoinKeyField,
|
implicitJoinKeyField,
|
||||||
searchAfterBuilder,
|
searchAfterBuilder,
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class EqlSearchRequestTests extends AbstractRequestTestCase<EqlSearchRequ
|
||||||
EqlSearchRequest.timestampField(randomAlphaOfLength(10));
|
EqlSearchRequest.timestampField(randomAlphaOfLength(10));
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
EqlSearchRequest.tieBreakerField(randomAlphaOfLength(10));
|
EqlSearchRequest.tiebreakerField(randomAlphaOfLength(10));
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
EqlSearchRequest.searchAfter(randomArray(1, 4, Object[]::new, () -> randomAlphaOfLength(3)));
|
EqlSearchRequest.searchAfter(randomArray(1, 4, Object[]::new, () -> randomAlphaOfLength(3)));
|
||||||
|
@ -78,7 +78,7 @@ public class EqlSearchRequestTests extends AbstractRequestTestCase<EqlSearchRequ
|
||||||
assertThat(serverInstance.eventCategoryField(), equalTo(clientTestInstance.eventCategoryField()));
|
assertThat(serverInstance.eventCategoryField(), equalTo(clientTestInstance.eventCategoryField()));
|
||||||
assertThat(serverInstance.implicitJoinKeyField(), equalTo(clientTestInstance.implicitJoinKeyField()));
|
assertThat(serverInstance.implicitJoinKeyField(), equalTo(clientTestInstance.implicitJoinKeyField()));
|
||||||
assertThat(serverInstance.timestampField(), equalTo(clientTestInstance.timestampField()));
|
assertThat(serverInstance.timestampField(), equalTo(clientTestInstance.timestampField()));
|
||||||
assertThat(serverInstance.tieBreakerField(), equalTo(clientTestInstance.tieBreakerField()));
|
assertThat(serverInstance.tiebreakerField(), equalTo(clientTestInstance.tiebreakerField()));
|
||||||
assertThat(serverInstance.filter(), equalTo(clientTestInstance.filter()));
|
assertThat(serverInstance.filter(), equalTo(clientTestInstance.filter()));
|
||||||
assertThat(serverInstance.query(), equalTo(clientTestInstance.query()));
|
assertThat(serverInstance.query(), equalTo(clientTestInstance.query()));
|
||||||
assertThat(serverInstance.searchAfter(), equalTo(clientTestInstance.searchAfter()));
|
assertThat(serverInstance.searchAfter(), equalTo(clientTestInstance.searchAfter()));
|
||||||
|
|
|
@ -189,7 +189,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
|
||||||
|
|
||||||
protected EqlSearchResponse runQuery(String index, String query) throws Exception {
|
protected EqlSearchResponse runQuery(String index, String query) throws Exception {
|
||||||
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
|
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
|
||||||
request.tieBreakerField("event.sequence");
|
request.tiebreakerField("event.sequence");
|
||||||
return eqlClient().search(request, RequestOptions.DEFAULT);
|
return eqlClient().search(request, RequestOptions.DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
|
|
||||||
private QueryBuilder filter = null;
|
private QueryBuilder filter = null;
|
||||||
private String timestampField = FIELD_TIMESTAMP;
|
private String timestampField = FIELD_TIMESTAMP;
|
||||||
private String tieBreakerField = null;
|
private String tiebreakerField = null;
|
||||||
private String eventCategoryField = FIELD_EVENT_CATEGORY;
|
private String eventCategoryField = FIELD_EVENT_CATEGORY;
|
||||||
private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
|
private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
|
||||||
private int fetchSize = FETCH_SIZE;
|
private int fetchSize = FETCH_SIZE;
|
||||||
|
@ -53,7 +53,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
|
|
||||||
static final String KEY_FILTER = "filter";
|
static final String KEY_FILTER = "filter";
|
||||||
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
|
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
|
||||||
static final String KEY_TIE_BREAKER_FIELD = "tie_breaker_field";
|
static final String KEY_TIEBREAKER_FIELD = "tiebreaker_field";
|
||||||
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
|
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
|
||||||
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
|
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
|
||||||
static final String KEY_SIZE = "size";
|
static final String KEY_SIZE = "size";
|
||||||
|
@ -63,7 +63,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
|
|
||||||
static final ParseField FILTER = new ParseField(KEY_FILTER);
|
static final ParseField FILTER = new ParseField(KEY_FILTER);
|
||||||
static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
|
static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
|
||||||
static final ParseField TIE_BREAKER_FIELD = new ParseField(KEY_TIE_BREAKER_FIELD);
|
static final ParseField TIEBREAKER_FIELD = new ParseField(KEY_TIEBREAKER_FIELD);
|
||||||
static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
|
static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
|
||||||
static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
|
static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
|
||||||
static final ParseField SIZE = new ParseField(KEY_SIZE);
|
static final ParseField SIZE = new ParseField(KEY_SIZE);
|
||||||
|
@ -83,7 +83,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||||
filter = in.readOptionalNamedWriteable(QueryBuilder.class);
|
filter = in.readOptionalNamedWriteable(QueryBuilder.class);
|
||||||
timestampField = in.readString();
|
timestampField = in.readString();
|
||||||
tieBreakerField = in.readOptionalString();
|
tiebreakerField = in.readOptionalString();
|
||||||
eventCategoryField = in.readString();
|
eventCategoryField = in.readString();
|
||||||
implicitJoinKeyField = in.readString();
|
implicitJoinKeyField = in.readString();
|
||||||
fetchSize = in.readVInt();
|
fetchSize = in.readVInt();
|
||||||
|
@ -140,8 +140,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
builder.field(KEY_FILTER, filter);
|
builder.field(KEY_FILTER, filter);
|
||||||
}
|
}
|
||||||
builder.field(KEY_TIMESTAMP_FIELD, timestampField());
|
builder.field(KEY_TIMESTAMP_FIELD, timestampField());
|
||||||
if (tieBreakerField != null) {
|
if (tiebreakerField != null) {
|
||||||
builder.field(KEY_TIE_BREAKER_FIELD, tieBreakerField());
|
builder.field(KEY_TIEBREAKER_FIELD, tiebreakerField());
|
||||||
}
|
}
|
||||||
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
|
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
|
||||||
if (implicitJoinKeyField != null) {
|
if (implicitJoinKeyField != null) {
|
||||||
|
@ -168,7 +168,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
parser.declareObject(EqlSearchRequest::filter,
|
parser.declareObject(EqlSearchRequest::filter,
|
||||||
(p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), FILTER);
|
(p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), FILTER);
|
||||||
parser.declareString(EqlSearchRequest::timestampField, TIMESTAMP_FIELD);
|
parser.declareString(EqlSearchRequest::timestampField, TIMESTAMP_FIELD);
|
||||||
parser.declareString(EqlSearchRequest::tieBreakerField, TIE_BREAKER_FIELD);
|
parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD);
|
||||||
parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
|
parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
|
||||||
parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
|
parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
|
||||||
parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
|
parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
|
||||||
|
@ -199,10 +199,10 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String tieBreakerField() { return this.tieBreakerField; }
|
public String tiebreakerField() { return this.tiebreakerField; }
|
||||||
|
|
||||||
public EqlSearchRequest tieBreakerField(String tieBreakerField) {
|
public EqlSearchRequest tiebreakerField(String tiebreakerField) {
|
||||||
this.tieBreakerField = tieBreakerField;
|
this.tiebreakerField = tiebreakerField;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
indicesOptions.writeIndicesOptions(out);
|
indicesOptions.writeIndicesOptions(out);
|
||||||
out.writeOptionalNamedWriteable(filter);
|
out.writeOptionalNamedWriteable(filter);
|
||||||
out.writeString(timestampField);
|
out.writeString(timestampField);
|
||||||
out.writeOptionalString(tieBreakerField);
|
out.writeOptionalString(tiebreakerField);
|
||||||
out.writeString(eventCategoryField);
|
out.writeString(eventCategoryField);
|
||||||
out.writeString(implicitJoinKeyField);
|
out.writeString(implicitJoinKeyField);
|
||||||
out.writeVInt(fetchSize);
|
out.writeVInt(fetchSize);
|
||||||
|
@ -288,7 +288,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
Objects.equals(indicesOptions, that.indicesOptions) &&
|
Objects.equals(indicesOptions, that.indicesOptions) &&
|
||||||
Objects.equals(filter, that.filter) &&
|
Objects.equals(filter, that.filter) &&
|
||||||
Objects.equals(timestampField, that.timestampField) &&
|
Objects.equals(timestampField, that.timestampField) &&
|
||||||
Objects.equals(tieBreakerField, that.tieBreakerField) &&
|
Objects.equals(tiebreakerField, that.tiebreakerField) &&
|
||||||
Objects.equals(eventCategoryField, that.eventCategoryField) &&
|
Objects.equals(eventCategoryField, that.eventCategoryField) &&
|
||||||
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
|
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
|
||||||
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
|
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
|
||||||
|
@ -304,7 +304,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
|
||||||
filter,
|
filter,
|
||||||
fetchSize,
|
fetchSize,
|
||||||
timestampField,
|
timestampField,
|
||||||
tieBreakerField,
|
tiebreakerField,
|
||||||
eventCategoryField,
|
eventCategoryField,
|
||||||
implicitJoinKeyField,
|
implicitJoinKeyField,
|
||||||
searchAfterBuilder,
|
searchAfterBuilder,
|
||||||
|
|
|
@ -30,8 +30,8 @@ public class EqlSearchRequestBuilder extends ActionRequestBuilder<EqlSearchReque
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EqlSearchRequestBuilder tieBreakerField(String tieBreakerField) {
|
public EqlSearchRequestBuilder tiebreakerField(String tiebreakerField) {
|
||||||
request.tieBreakerField(tieBreakerField);
|
request.tiebreakerField(tiebreakerField);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,14 +18,14 @@ public class Criterion {
|
||||||
private final SearchSourceBuilder searchSource;
|
private final SearchSourceBuilder searchSource;
|
||||||
private final List<HitExtractor> keyExtractors;
|
private final List<HitExtractor> keyExtractors;
|
||||||
private final HitExtractor timestampExtractor;
|
private final HitExtractor timestampExtractor;
|
||||||
private final HitExtractor tieBreakerExtractor;
|
private final HitExtractor tiebreakerExtractor;
|
||||||
|
|
||||||
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
|
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
|
||||||
HitExtractor tieBreakerExtractor) {
|
HitExtractor tiebreakerExtractor) {
|
||||||
this.searchSource = searchSource;
|
this.searchSource = searchSource;
|
||||||
this.keyExtractors = searchAfterExractors;
|
this.keyExtractors = searchAfterExractors;
|
||||||
this.timestampExtractor = timestampExtractor;
|
this.timestampExtractor = timestampExtractor;
|
||||||
this.tieBreakerExtractor = tieBreakerExtractor;
|
this.tiebreakerExtractor = tiebreakerExtractor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SearchSourceBuilder searchSource() {
|
public SearchSourceBuilder searchSource() {
|
||||||
|
@ -40,8 +40,8 @@ public class Criterion {
|
||||||
return timestampExtractor;
|
return timestampExtractor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HitExtractor tieBreakerExtractor() {
|
public HitExtractor tiebreakerExtractor() {
|
||||||
return tieBreakerExtractor;
|
return tiebreakerExtractor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long timestamp(SearchHit hit) {
|
public long timestamp(SearchHit hit) {
|
||||||
|
@ -53,11 +53,11 @@ public class Criterion {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked" })
|
@SuppressWarnings({ "unchecked" })
|
||||||
public Comparable<Object> tieBreaker(SearchHit hit) {
|
public Comparable<Object> tiebreaker(SearchHit hit) {
|
||||||
if (tieBreakerExtractor == null) {
|
if (tiebreakerExtractor == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Object tb = tieBreakerExtractor.extract(hit);
|
Object tb = tiebreakerExtractor.extract(hit);
|
||||||
if (tb instanceof Comparable) {
|
if (tb instanceof Comparable) {
|
||||||
return (Comparable<Object>) tb;
|
return (Comparable<Object>) tb;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class ExecutionManager implements QueryClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Executable assemble(List<List<Attribute>> listOfKeys, List<PhysicalPlan> plans, Attribute timestamp, Attribute tieBreaker) {
|
public Executable assemble(List<List<Attribute>> listOfKeys, List<PhysicalPlan> plans, Attribute timestamp, Attribute tiebreaker) {
|
||||||
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
|
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
|
||||||
|
|
||||||
List<Criterion> criteria = new ArrayList<>(plans.size() - 1);
|
List<Criterion> criteria = new ArrayList<>(plans.size() - 1);
|
||||||
|
@ -68,7 +68,7 @@ public class ExecutionManager implements QueryClient {
|
||||||
List<Attribute> keys = listOfKeys.get(i);
|
List<Attribute> keys = listOfKeys.get(i);
|
||||||
// fields
|
// fields
|
||||||
HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
|
HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
|
||||||
HitExtractor tbExtractor = Expressions.isPresent(tieBreaker) ? hitExtractor(tieBreaker, extractorRegistry) : null;
|
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
|
||||||
List<HitExtractor> keyExtractors = hitExtractors(keys, extractorRegistry);
|
List<HitExtractor> keyExtractors = hitExtractors(keys, extractorRegistry);
|
||||||
|
|
||||||
PhysicalPlan query = plans.get(i);
|
PhysicalPlan query = plans.get(i);
|
||||||
|
|
|
@ -11,11 +11,11 @@ import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
|
||||||
class KeyAndOrdinal {
|
class KeyAndOrdinal {
|
||||||
final SequenceKey key;
|
final SequenceKey key;
|
||||||
final long timestamp;
|
final long timestamp;
|
||||||
final Comparable<Object> tieBreaker;
|
final Comparable<Object> tiebreaker;
|
||||||
|
|
||||||
KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tieBreaker) {
|
KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tiebreaker) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tieBreaker = tieBreaker;
|
this.tiebreaker = tiebreaker;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,8 +37,8 @@ class SequenceRuntime implements Executable {
|
||||||
this.criteria = criteria;
|
this.criteria = criteria;
|
||||||
this.numberOfStages = criteria.size();
|
this.numberOfStages = criteria.size();
|
||||||
this.queryClient = queryClient;
|
this.queryClient = queryClient;
|
||||||
boolean hasTieBreaker = criteria.get(0).tieBreakerExtractor() != null;
|
boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null;
|
||||||
this.stateMachine = new SequenceStateMachine(numberOfStages, hasTieBreaker);
|
this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,19 +80,19 @@ class SequenceRuntime implements Executable {
|
||||||
tMin = lastCriterion.timestamp(hits.get(0));
|
tMin = lastCriterion.timestamp(hits.get(0));
|
||||||
tMax = lastCriterion.timestamp(hits.get(hits.size() - 1));
|
tMax = lastCriterion.timestamp(hits.get(hits.size() - 1));
|
||||||
|
|
||||||
if (lastCriterion.tieBreakerExtractor() != null) {
|
if (lastCriterion.tiebreakerExtractor() != null) {
|
||||||
bMin = lastCriterion.tieBreaker(hits.get(0));
|
bMin = lastCriterion.tiebreaker(hits.get(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (SearchHit hit : hits) {
|
for (SearchHit hit : hits) {
|
||||||
KeyAndOrdinal ko = findKey(hit, lastCriterion);
|
KeyAndOrdinal ko = findKey(hit, lastCriterion);
|
||||||
Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tieBreaker, hit);
|
Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit);
|
||||||
stateMachine.trackSequence(seq, tMin, tMax);
|
stateMachine.trackSequence(seq, tMin, tMax);
|
||||||
}
|
}
|
||||||
stateMachine.setTimestampMarker(0, tMin);
|
stateMachine.setTimestampMarker(0, tMin);
|
||||||
if (bMin != null) {
|
if (bMin != null) {
|
||||||
stateMachine.setTieBreakerMarker(0, bMin);
|
stateMachine.setTiebreakerMarker(0, bMin);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +120,7 @@ class SequenceRuntime implements Executable {
|
||||||
// break the results per key
|
// break the results per key
|
||||||
for (SearchHit hit : hits) {
|
for (SearchHit hit : hits) {
|
||||||
KeyAndOrdinal ko = findKey(hit, currentCriterion);
|
KeyAndOrdinal ko = findKey(hit, currentCriterion);
|
||||||
stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tieBreaker, hit);
|
stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ class SequenceRuntime implements Executable {
|
||||||
key = new SequenceKey(docKeys);
|
key = new SequenceKey(docKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tieBreaker(hit));
|
return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tiebreaker(hit));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Results assembleResults() {
|
private Results assembleResults() {
|
||||||
|
|
|
@ -16,12 +16,12 @@ import java.util.Objects;
|
||||||
class Match {
|
class Match {
|
||||||
|
|
||||||
private final long timestamp;
|
private final long timestamp;
|
||||||
private final Comparable<Object> tieBreaker;
|
private final Comparable<Object> tiebreaker;
|
||||||
private final SearchHit hit;
|
private final SearchHit hit;
|
||||||
|
|
||||||
Match(long timestamp, Comparable<Object> tieBreaker, SearchHit hit) {
|
Match(long timestamp, Comparable<Object> tiebreaker, SearchHit hit) {
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tieBreaker = tieBreaker;
|
this.tiebreaker = tiebreaker;
|
||||||
this.hit = hit;
|
this.hit = hit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,8 +29,8 @@ class Match {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
Comparable<Object> tieBreaker() {
|
Comparable<Object> tiebreaker() {
|
||||||
return tieBreaker;
|
return tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
SearchHit hit() {
|
SearchHit hit() {
|
||||||
|
@ -39,7 +39,7 @@ class Match {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(timestamp, tieBreaker, hit);
|
return Objects.hash(timestamp, tiebreaker, hit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -54,12 +54,12 @@ class Match {
|
||||||
|
|
||||||
Match other = (Match) obj;
|
Match other = (Match) obj;
|
||||||
return Objects.equals(timestamp, other.timestamp)
|
return Objects.equals(timestamp, other.timestamp)
|
||||||
&& Objects.equals(tieBreaker, other.tieBreaker)
|
&& Objects.equals(tiebreaker, other.tiebreaker)
|
||||||
&& Objects.equals(hit, other.hit);
|
&& Objects.equals(hit, other.hit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return timestamp + "[" + (tieBreaker != null ? tieBreaker : "") + "]->" + hit.getId();
|
return timestamp + "[" + (tiebreaker != null ? tiebreaker : "") + "]->" + hit.getId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,19 +31,19 @@ public class Sequence {
|
||||||
|
|
||||||
private int currentStage = 0;
|
private int currentStage = 0;
|
||||||
|
|
||||||
public Sequence(SequenceKey key, int stages, long timestamp, Comparable<Object> tieBreaker, SearchHit firstHit) {
|
public Sequence(SequenceKey key, int stages, long timestamp, Comparable<Object> tiebreaker, SearchHit firstHit) {
|
||||||
Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
|
Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.stages = stages;
|
this.stages = stages;
|
||||||
this.matches = new Match[stages];
|
this.matches = new Match[stages];
|
||||||
this.matches[0] = new Match(timestamp, tieBreaker, firstHit);
|
this.matches[0] = new Match(timestamp, tiebreaker, firstHit);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int putMatch(int stage, SearchHit hit, long timestamp, Comparable<Object> tieBreaker) {
|
public int putMatch(int stage, SearchHit hit, long timestamp, Comparable<Object> tiebreaker) {
|
||||||
if (stage == currentStage + 1) {
|
if (stage == currentStage + 1) {
|
||||||
int previousStage = currentStage;
|
int previousStage = currentStage;
|
||||||
currentStage = stage;
|
currentStage = stage;
|
||||||
matches[currentStage] = new Match(timestamp, tieBreaker, hit);
|
matches[currentStage] = new Match(timestamp, tiebreaker, hit);
|
||||||
return previousStage;
|
return previousStage;
|
||||||
}
|
}
|
||||||
throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage);
|
throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage);
|
||||||
|
@ -61,8 +61,8 @@ public class Sequence {
|
||||||
return matches[currentStage].timestamp();
|
return matches[currentStage].timestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Comparable<Object> currentTieBreaker() {
|
public Comparable<Object> currentTiebreaker() {
|
||||||
return matches[currentStage].tieBreaker();
|
return matches[currentStage].tiebreaker();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long timestamp(int stage) {
|
public long timestamp(int stage) {
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class SequenceFrame {
|
||||||
* Returns the latest Sequence from the group that has its timestamp
|
* Returns the latest Sequence from the group that has its timestamp
|
||||||
* less than the given argument alongside its position in the list.
|
* less than the given argument alongside its position in the list.
|
||||||
*/
|
*/
|
||||||
public Tuple<Sequence, Integer> before(long timestamp, Comparable<Object> tieBreaker) {
|
public Tuple<Sequence, Integer> before(long timestamp, Comparable<Object> tiebreaker) {
|
||||||
Sequence matchSeq = null;
|
Sequence matchSeq = null;
|
||||||
int matchPos = -1;
|
int matchPos = -1;
|
||||||
int position = -1;
|
int position = -1;
|
||||||
|
@ -61,9 +61,9 @@ public class SequenceFrame {
|
||||||
matchPos = position;
|
matchPos = position;
|
||||||
}
|
}
|
||||||
// apply tiebreaker (null first, that is null is less than any value)
|
// apply tiebreaker (null first, that is null is less than any value)
|
||||||
else if (tieBreaker != null && sequence.currentTimestamp() == timestamp) {
|
else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) {
|
||||||
Comparable<Object> tb = sequence.currentTieBreaker();
|
Comparable<Object> tb = sequence.currentTiebreaker();
|
||||||
if (tb == null || tb.compareTo(tieBreaker) < 0) {
|
if (tb == null || tb.compareTo(tiebreaker) < 0) {
|
||||||
matchSeq = sequence;
|
matchSeq = sequence;
|
||||||
matchPos = position;
|
matchPos = position;
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ public class SequenceFrame {
|
||||||
* Returns the first Sequence from the group that has its timestamp
|
* Returns the first Sequence from the group that has its timestamp
|
||||||
* greater than the given argument alongside its position in the list.
|
* greater than the given argument alongside its position in the list.
|
||||||
*/
|
*/
|
||||||
public Tuple<Sequence, Integer> after(long timestamp, Comparable<Object> tieBreaker) {
|
public Tuple<Sequence, Integer> after(long timestamp, Comparable<Object> tiebreaker) {
|
||||||
Sequence matchSeq = null;
|
Sequence matchSeq = null;
|
||||||
int matchPos = -1;
|
int matchPos = -1;
|
||||||
int position = -1;
|
int position = -1;
|
||||||
|
@ -90,9 +90,9 @@ public class SequenceFrame {
|
||||||
matchPos = position;
|
matchPos = position;
|
||||||
}
|
}
|
||||||
// apply tiebreaker (null first, that is null is less than any value)
|
// apply tiebreaker (null first, that is null is less than any value)
|
||||||
else if (tieBreaker != null && sequence.currentTimestamp() == timestamp) {
|
else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) {
|
||||||
Comparable<Object> tb = sequence.currentTieBreaker();
|
Comparable<Object> tb = sequence.currentTiebreaker();
|
||||||
if (tb == null || tb.compareTo(tieBreaker) > 0) {
|
if (tb == null || tb.compareTo(tiebreaker) > 0) {
|
||||||
matchSeq = sequence;
|
matchSeq = sequence;
|
||||||
matchPos = position;
|
matchPos = position;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ public class SequenceStateMachine {
|
||||||
/** this ignores the key */
|
/** this ignores the key */
|
||||||
private final long[] timestampMarkers;
|
private final long[] timestampMarkers;
|
||||||
|
|
||||||
private final Comparable<Object>[] tieBreakerMarkers;
|
private final Comparable<Object>[] tiebreakerMarkers;
|
||||||
private final boolean hasTieBreaker;
|
private final boolean hasTieBreaker;
|
||||||
|
|
||||||
private final int completionStage;
|
private final int completionStage;
|
||||||
|
@ -36,16 +36,16 @@ public class SequenceStateMachine {
|
||||||
private final List<Sequence> completed;
|
private final List<Sequence> completed;
|
||||||
|
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
public SequenceStateMachine(int stages, boolean hasTieBreaker) {
|
public SequenceStateMachine(int stages, boolean hasTiebreaker) {
|
||||||
this.completionStage = stages - 1;
|
this.completionStage = stages - 1;
|
||||||
|
|
||||||
this.stageToKeys = new StageToKeys(completionStage);
|
this.stageToKeys = new StageToKeys(completionStage);
|
||||||
this.keyToSequences = new KeyToSequences(completionStage);
|
this.keyToSequences = new KeyToSequences(completionStage);
|
||||||
this.timestampMarkers = new long[completionStage];
|
this.timestampMarkers = new long[completionStage];
|
||||||
this.tieBreakerMarkers = new Comparable[completionStage];
|
this.tiebreakerMarkers = new Comparable[completionStage];
|
||||||
this.completed = new LinkedList<>();
|
this.completed = new LinkedList<>();
|
||||||
|
|
||||||
this.hasTieBreaker = hasTieBreaker;
|
this.hasTieBreaker = hasTiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Sequence> completeSequences() {
|
public List<Sequence> completeSequences() {
|
||||||
|
@ -56,21 +56,21 @@ public class SequenceStateMachine {
|
||||||
return timestampMarkers[stage];
|
return timestampMarkers[stage];
|
||||||
}
|
}
|
||||||
|
|
||||||
public Comparable<?> getTieBreakerMarker(int stage) {
|
public Comparable<?> getTiebreakerMarker(int stage) {
|
||||||
return tieBreakerMarkers[stage];
|
return tiebreakerMarkers[stage];
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTimestampMarker(int stage, long timestamp) {
|
public void setTimestampMarker(int stage, long timestamp) {
|
||||||
timestampMarkers[stage] = timestamp;
|
timestampMarkers[stage] = timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTieBreakerMarker(int stage, Comparable<Object> tieBreaker) {
|
public void setTiebreakerMarker(int stage, Comparable<Object> tiebreaker) {
|
||||||
tieBreakerMarkers[stage] = tieBreaker;
|
tiebreakerMarkers[stage] = tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object[] getMarkers(int stage) {
|
public Object[] getMarkers(int stage) {
|
||||||
long ts = timestampMarkers[stage];
|
long ts = timestampMarkers[stage];
|
||||||
Comparable<Object> tb = tieBreakerMarkers[stage];
|
Comparable<Object> tb = tiebreakerMarkers[stage];
|
||||||
return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts };
|
return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,10 +84,10 @@ public class SequenceStateMachine {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Match the given hit (based on key and timestamp and potential tieBreaker) with any potential sequence from the previous
|
* Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
|
||||||
* given stage. If that's the case, update the sequence and the rest of the references.
|
* given stage. If that's the case, update the sequence and the rest of the references.
|
||||||
*/
|
*/
|
||||||
public boolean match(int stage, SequenceKey key, long timestamp, Comparable<Object> tieBreaker, SearchHit hit) {
|
public boolean match(int stage, SequenceKey key, long timestamp, Comparable<Object> tiebreaker, SearchHit hit) {
|
||||||
int previousStage = stage - 1;
|
int previousStage = stage - 1;
|
||||||
// check key presence to avoid creating a collection
|
// check key presence to avoid creating a collection
|
||||||
SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key);
|
SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key);
|
||||||
|
@ -95,7 +95,7 @@ public class SequenceStateMachine {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// pick the sequence with the highest timestamp lower than current match timestamp
|
// pick the sequence with the highest timestamp lower than current match timestamp
|
||||||
Tuple<Sequence, Integer> before = frame.before(timestamp, tieBreaker);
|
Tuple<Sequence, Integer> before = frame.before(timestamp, tiebreaker);
|
||||||
if (before == null) {
|
if (before == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ public class SequenceStateMachine {
|
||||||
// eliminate the match and all previous values from the frame
|
// eliminate the match and all previous values from the frame
|
||||||
frame.trim(before.v2() + 1);
|
frame.trim(before.v2() + 1);
|
||||||
// update sequence
|
// update sequence
|
||||||
sequence.putMatch(stage, hit, timestamp, tieBreaker);
|
sequence.putMatch(stage, hit, timestamp, tiebreaker);
|
||||||
|
|
||||||
// remove the frame and keys early (as the key space is large)
|
// remove the frame and keys early (as the key space is large)
|
||||||
if (frame.isEmpty()) {
|
if (frame.isEmpty()) {
|
||||||
|
|
|
@ -17,21 +17,21 @@ import static java.util.Collections.singletonList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Time ordinal for a given event.
|
* Time ordinal for a given event.
|
||||||
* It is an internal construct that wraps the mandatory timestamp attribute and the optional application tie-breaker.
|
* It is an internal construct that wraps the mandatory timestamp attribute and the optional application tiebreaker.
|
||||||
*/
|
*/
|
||||||
public class TimeOrdinal implements Resolvable {
|
public class TimeOrdinal implements Resolvable {
|
||||||
|
|
||||||
private final Attribute timestamp;
|
private final Attribute timestamp;
|
||||||
private final Attribute tieBreaker;
|
private final Attribute tiebreaker;
|
||||||
|
|
||||||
public TimeOrdinal(Attribute timestamp, Attribute tieBreaker) {
|
public TimeOrdinal(Attribute timestamp, Attribute tiebreaker) {
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tieBreaker = tieBreaker;
|
this.tiebreaker = tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(timestamp, tieBreaker);
|
return Objects.hash(timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -46,15 +46,15 @@ public class TimeOrdinal implements Resolvable {
|
||||||
|
|
||||||
TimeOrdinal other = (TimeOrdinal) obj;
|
TimeOrdinal other = (TimeOrdinal) obj;
|
||||||
return Objects.equals(timestamp, other.timestamp) &&
|
return Objects.equals(timestamp, other.timestamp) &&
|
||||||
Objects.equals(tieBreaker, other.tieBreaker);
|
Objects.equals(tiebreaker, other.tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean resolved() {
|
public boolean resolved() {
|
||||||
return timestamp.resolved() && (tieBreaker == null || tieBreaker.resolved());
|
return timestamp.resolved() && (tiebreaker == null || tiebreaker.resolved());
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Attribute> output() {
|
public List<Attribute> output() {
|
||||||
return tieBreaker == null ? singletonList(timestamp) : asList(timestamp, tieBreaker);
|
return tiebreaker == null ? singletonList(timestamp) : asList(timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,8 +58,8 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
||||||
return new UnresolvedAttribute(Source.EMPTY, params.fieldTimestamp());
|
return new UnresolvedAttribute(Source.EMPTY, params.fieldTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Attribute fieldTieBreaker() {
|
private Attribute fieldTiebreaker() {
|
||||||
return params.fieldTieBreaker() != null ? new UnresolvedAttribute(Source.EMPTY, params.fieldTieBreaker()) : UNSPECIFIED_FIELD;
|
return params.fieldTiebreaker() != null ? new UnresolvedAttribute(Source.EMPTY, params.fieldTiebreaker()) : UNSPECIFIED_FIELD;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -88,10 +88,10 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
||||||
|
|
||||||
// TODO: add implicit sorting - when pipes are added, this would better sit there (as a default pipe)
|
// TODO: add implicit sorting - when pipes are added, this would better sit there (as a default pipe)
|
||||||
orders.add(new Order(source, fieldTimestamp(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
|
orders.add(new Order(source, fieldTimestamp(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
|
||||||
// make sure to add the tieBreaker as well
|
// make sure to add the tiebreaker as well
|
||||||
Attribute tieBreaker = fieldTieBreaker();
|
Attribute tiebreaker = fieldTiebreaker();
|
||||||
if (Expressions.isPresent(tieBreaker)) {
|
if (Expressions.isPresent(tiebreaker)) {
|
||||||
orders.add(new Order(source, tieBreaker, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
|
orders.add(new Order(source, tiebreaker, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
|
||||||
}
|
}
|
||||||
|
|
||||||
OrderBy orderBy = new OrderBy(source, filter, orders);
|
OrderBy orderBy = new OrderBy(source, filter, orders);
|
||||||
|
@ -133,7 +133,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
||||||
until = defaultUntil(source);
|
until = defaultUntil(source);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Join(source, queries, until, fieldTimestamp(), fieldTieBreaker());
|
return new Join(source, queries, until, fieldTimestamp(), fieldTiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
private KeyedFilter defaultUntil(Source source) {
|
private KeyedFilter defaultUntil(Source source) {
|
||||||
|
@ -150,13 +150,13 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
||||||
LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
|
LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
|
||||||
|
|
||||||
List<Attribute> output = CollectionUtils.combine(keys, fieldTimestamp());
|
List<Attribute> output = CollectionUtils.combine(keys, fieldTimestamp());
|
||||||
Attribute fieldTieBreaker = fieldTieBreaker();
|
Attribute fieldTieBreaker = fieldTiebreaker();
|
||||||
if (Expressions.isPresent(fieldTieBreaker)) {
|
if (Expressions.isPresent(fieldTieBreaker)) {
|
||||||
output = CollectionUtils.combine(output, fieldTieBreaker);
|
output = CollectionUtils.combine(output, fieldTieBreaker);
|
||||||
}
|
}
|
||||||
LogicalPlan child = new Project(source(ctx), eventQuery, output);
|
LogicalPlan child = new Project(source(ctx), eventQuery, output);
|
||||||
|
|
||||||
return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTieBreaker());
|
return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -199,7 +199,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
||||||
until = defaultUntil(source);
|
until = defaultUntil(source);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTieBreaker());
|
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
public KeyedFilter visitSequenceTerm(SequenceTermContext ctx, List<Attribute> joinKeys) {
|
public KeyedFilter visitSequenceTerm(SequenceTermContext ctx, List<Attribute> joinKeys) {
|
||||||
|
|
|
@ -19,7 +19,7 @@ public class ParserParams {
|
||||||
private final ZoneId zoneId;
|
private final ZoneId zoneId;
|
||||||
private String fieldEventCategory = FIELD_EVENT_CATEGORY;
|
private String fieldEventCategory = FIELD_EVENT_CATEGORY;
|
||||||
private String fieldTimestamp = FIELD_TIMESTAMP;
|
private String fieldTimestamp = FIELD_TIMESTAMP;
|
||||||
private String fieldTieBreaker = null;
|
private String fieldTiebreaker = null;
|
||||||
private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY;
|
private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY;
|
||||||
private List<Object> queryParams = emptyList();
|
private List<Object> queryParams = emptyList();
|
||||||
|
|
||||||
|
@ -45,12 +45,12 @@ public class ParserParams {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String fieldTieBreaker() {
|
public String fieldTiebreaker() {
|
||||||
return fieldTieBreaker;
|
return fieldTiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ParserParams fieldTieBreaker(String fieldTieBreaker) {
|
public ParserParams fieldTiebreaker(String fieldTiebreaker) {
|
||||||
this.fieldTieBreaker = fieldTieBreaker;
|
this.fieldTiebreaker = fieldTiebreaker;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,18 +27,18 @@ public class Join extends LogicalPlan {
|
||||||
private final List<KeyedFilter> queries;
|
private final List<KeyedFilter> queries;
|
||||||
private final KeyedFilter until;
|
private final KeyedFilter until;
|
||||||
private final Attribute timestamp;
|
private final Attribute timestamp;
|
||||||
private final Attribute tieBreaker;
|
private final Attribute tiebreaker;
|
||||||
|
|
||||||
public Join(Source source, List<KeyedFilter> queries, KeyedFilter until, Attribute timestamp, Attribute tieBreaker) {
|
public Join(Source source, List<KeyedFilter> queries, KeyedFilter until, Attribute timestamp, Attribute tiebreaker) {
|
||||||
super(source, CollectionUtils.combine(queries, until));
|
super(source, CollectionUtils.combine(queries, until));
|
||||||
this.queries = queries;
|
this.queries = queries;
|
||||||
this.until = until;
|
this.until = until;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tieBreaker = tieBreaker;
|
this.tiebreaker = tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Join(Source source, List<LogicalPlan> queries, LogicalPlan until, Attribute timestamp, Attribute tieBreaker) {
|
private Join(Source source, List<LogicalPlan> queries, LogicalPlan until, Attribute timestamp, Attribute tiebreaker) {
|
||||||
this(source, asKeyed(queries), asKeyed(until), timestamp, tieBreaker);
|
this(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<KeyedFilter> asKeyed(List<LogicalPlan> list) {
|
static List<KeyedFilter> asKeyed(List<LogicalPlan> list) {
|
||||||
|
@ -59,7 +59,7 @@ public class Join extends LogicalPlan {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected NodeInfo<? extends Join> info() {
|
protected NodeInfo<? extends Join> info() {
|
||||||
return NodeInfo.create(this, Join::new, queries, until, timestamp, tieBreaker);
|
return NodeInfo.create(this, Join::new, queries, until, timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -68,7 +68,7 @@ public class Join extends LogicalPlan {
|
||||||
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
|
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
|
||||||
}
|
}
|
||||||
int lastIndex = newChildren.size() - 1;
|
int lastIndex = newChildren.size() - 1;
|
||||||
return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestamp, tieBreaker);
|
return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -76,8 +76,8 @@ public class Join extends LogicalPlan {
|
||||||
List<Attribute> out = new ArrayList<>();
|
List<Attribute> out = new ArrayList<>();
|
||||||
|
|
||||||
out.add(timestamp);
|
out.add(timestamp);
|
||||||
if (Expressions.isPresent(tieBreaker)) {
|
if (Expressions.isPresent(tiebreaker)) {
|
||||||
out.add(tieBreaker);
|
out.add(tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (KeyedFilter query : queries) {
|
for (KeyedFilter query : queries) {
|
||||||
|
@ -88,7 +88,7 @@ public class Join extends LogicalPlan {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean expressionsResolved() {
|
public boolean expressionsResolved() {
|
||||||
return timestamp.resolved() && tieBreaker.resolved() && until.resolved() && Resolvables.resolved(queries);
|
return timestamp.resolved() && tiebreaker.resolved() && until.resolved() && Resolvables.resolved(queries);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<KeyedFilter> queries() {
|
public List<KeyedFilter> queries() {
|
||||||
|
@ -103,13 +103,13 @@ public class Join extends LogicalPlan {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Attribute tieBreaker() {
|
public Attribute tiebreaker() {
|
||||||
return tieBreaker;
|
return tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(timestamp, tieBreaker, queries, until);
|
return Objects.hash(timestamp, tiebreaker, queries, until);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -126,7 +126,7 @@ public class Join extends LogicalPlan {
|
||||||
return Objects.equals(queries, other.queries)
|
return Objects.equals(queries, other.queries)
|
||||||
&& Objects.equals(until, other.until)
|
&& Objects.equals(until, other.until)
|
||||||
&& Objects.equals(timestamp, other.timestamp)
|
&& Objects.equals(timestamp, other.timestamp)
|
||||||
&& Objects.equals(tieBreaker, other.tieBreaker);
|
&& Objects.equals(tiebreaker, other.tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,23 +25,23 @@ public class KeyedFilter extends UnaryPlan {
|
||||||
|
|
||||||
private final List<? extends NamedExpression> keys;
|
private final List<? extends NamedExpression> keys;
|
||||||
private final Attribute timestamp;
|
private final Attribute timestamp;
|
||||||
private final Attribute tieBreaker;
|
private final Attribute tiebreaker;
|
||||||
|
|
||||||
public KeyedFilter(Source source, LogicalPlan child, List<? extends NamedExpression> keys, Attribute timestamp, Attribute tieBreaker) {
|
public KeyedFilter(Source source, LogicalPlan child, List<? extends NamedExpression> keys, Attribute timestamp, Attribute tiebreaker) {
|
||||||
super(source, child);
|
super(source, child);
|
||||||
this.keys = keys;
|
this.keys = keys;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tieBreaker = tieBreaker;
|
this.tiebreaker = tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected NodeInfo<KeyedFilter> info() {
|
protected NodeInfo<KeyedFilter> info() {
|
||||||
return NodeInfo.create(this, KeyedFilter::new, child(), keys, timestamp, tieBreaker);
|
return NodeInfo.create(this, KeyedFilter::new, child(), keys, timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected KeyedFilter replaceChild(LogicalPlan newChild) {
|
protected KeyedFilter replaceChild(LogicalPlan newChild) {
|
||||||
return new KeyedFilter(source(), newChild, keys, timestamp, tieBreaker);
|
return new KeyedFilter(source(), newChild, keys, timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<? extends NamedExpression> keys() {
|
public List<? extends NamedExpression> keys() {
|
||||||
|
@ -52,18 +52,18 @@ public class KeyedFilter extends UnaryPlan {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Attribute tieBreaker() {
|
public Attribute tiebreaker() {
|
||||||
return tieBreaker;
|
return tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean expressionsResolved() {
|
public boolean expressionsResolved() {
|
||||||
return Resolvables.resolved(keys) && timestamp.resolved() && tieBreaker.resolved();
|
return Resolvables.resolved(keys) && timestamp.resolved() && tiebreaker.resolved();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(keys, timestamp, tieBreaker, child());
|
return Objects.hash(keys, timestamp, tiebreaker, child());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,7 +79,7 @@ public class KeyedFilter extends UnaryPlan {
|
||||||
|
|
||||||
return Objects.equals(keys, other.keys)
|
return Objects.equals(keys, other.keys)
|
||||||
&& Objects.equals(timestamp, other.timestamp)
|
&& Objects.equals(timestamp, other.timestamp)
|
||||||
&& Objects.equals(tieBreaker, other.tieBreaker)
|
&& Objects.equals(tiebreaker, other.tiebreaker)
|
||||||
&& Objects.equals(child(), other.child());
|
&& Objects.equals(child(), other.child());
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -23,20 +23,20 @@ public class Sequence extends Join {
|
||||||
private final TimeValue maxSpan;
|
private final TimeValue maxSpan;
|
||||||
|
|
||||||
public Sequence(Source source, List<KeyedFilter> queries, KeyedFilter until, TimeValue maxSpan, Attribute timestamp,
|
public Sequence(Source source, List<KeyedFilter> queries, KeyedFilter until, TimeValue maxSpan, Attribute timestamp,
|
||||||
Attribute tieBreaker) {
|
Attribute tiebreaker) {
|
||||||
super(source, queries, until, timestamp, tieBreaker);
|
super(source, queries, until, timestamp, tiebreaker);
|
||||||
this.maxSpan = maxSpan;
|
this.maxSpan = maxSpan;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Sequence(Source source, List<LogicalPlan> queries, LogicalPlan until, TimeValue maxSpan, Attribute timestamp,
|
private Sequence(Source source, List<LogicalPlan> queries, LogicalPlan until, TimeValue maxSpan, Attribute timestamp,
|
||||||
Attribute tieBreaker) {
|
Attribute tiebreaker) {
|
||||||
super(source, asKeyed(queries), asKeyed(until), timestamp, tieBreaker);
|
super(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker);
|
||||||
this.maxSpan = maxSpan;
|
this.maxSpan = maxSpan;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected NodeInfo<Sequence> info() {
|
protected NodeInfo<Sequence> info() {
|
||||||
return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestamp(), tieBreaker());
|
return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestamp(), tiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -45,7 +45,7 @@ public class Sequence extends Join {
|
||||||
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
|
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
|
||||||
}
|
}
|
||||||
int lastIndex = newChildren.size() - 1;
|
int lastIndex = newChildren.size() - 1;
|
||||||
return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestamp(), tieBreaker());
|
return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestamp(), tiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimeValue maxSpan() {
|
public TimeValue maxSpan() {
|
||||||
|
|
|
@ -28,7 +28,7 @@ public class SequenceExec extends PhysicalPlan {
|
||||||
|
|
||||||
private final List<List<Attribute>> keys;
|
private final List<List<Attribute>> keys;
|
||||||
private final Attribute timestamp;
|
private final Attribute timestamp;
|
||||||
private final Attribute tieBreaker;
|
private final Attribute tiebreaker;
|
||||||
|
|
||||||
public SequenceExec(Source source,
|
public SequenceExec(Source source,
|
||||||
List<List<Attribute>> keys,
|
List<List<Attribute>> keys,
|
||||||
|
@ -36,20 +36,20 @@ public class SequenceExec extends PhysicalPlan {
|
||||||
List<Attribute> untilKeys,
|
List<Attribute> untilKeys,
|
||||||
PhysicalPlan until,
|
PhysicalPlan until,
|
||||||
Attribute timestamp,
|
Attribute timestamp,
|
||||||
Attribute tieBreaker) {
|
Attribute tiebreaker) {
|
||||||
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tieBreaker);
|
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SequenceExec(Source source, List<PhysicalPlan> children, List<List<Attribute>> keys, Attribute ts, Attribute tb) {
|
private SequenceExec(Source source, List<PhysicalPlan> children, List<List<Attribute>> keys, Attribute ts, Attribute tb) {
|
||||||
super(source, children);
|
super(source, children);
|
||||||
this.keys = keys;
|
this.keys = keys;
|
||||||
this.timestamp = ts;
|
this.timestamp = ts;
|
||||||
this.tieBreaker = tb;
|
this.tiebreaker = tb;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected NodeInfo<SequenceExec> info() {
|
protected NodeInfo<SequenceExec> info() {
|
||||||
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tieBreaker);
|
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -59,15 +59,15 @@ public class SequenceExec extends PhysicalPlan {
|
||||||
children().size(),
|
children().size(),
|
||||||
newChildren.size());
|
newChildren.size());
|
||||||
}
|
}
|
||||||
return new SequenceExec(source(), newChildren, keys, timestamp, tieBreaker);
|
return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Attribute> output() {
|
public List<Attribute> output() {
|
||||||
List<Attribute> attrs = new ArrayList<>();
|
List<Attribute> attrs = new ArrayList<>();
|
||||||
attrs.add(timestamp);
|
attrs.add(timestamp);
|
||||||
if (Expressions.isPresent(tieBreaker)) {
|
if (Expressions.isPresent(tiebreaker)) {
|
||||||
attrs.add(tieBreaker);
|
attrs.add(tiebreaker);
|
||||||
}
|
}
|
||||||
for (List<? extends NamedExpression> ne : keys) {
|
for (List<? extends NamedExpression> ne : keys) {
|
||||||
attrs.addAll(Expressions.asAttributes(ne));
|
attrs.addAll(Expressions.asAttributes(ne));
|
||||||
|
@ -83,18 +83,18 @@ public class SequenceExec extends PhysicalPlan {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Attribute tieBreaker() {
|
public Attribute tiebreaker() {
|
||||||
return tieBreaker;
|
return tiebreaker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||||
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tieBreaker()).execute(listener);
|
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker()).execute(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(timestamp, tieBreaker, keys, children());
|
return Objects.hash(timestamp, tiebreaker, keys, children());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -109,7 +109,7 @@ public class SequenceExec extends PhysicalPlan {
|
||||||
|
|
||||||
SequenceExec other = (SequenceExec) obj;
|
SequenceExec other = (SequenceExec) obj;
|
||||||
return Objects.equals(timestamp, other.timestamp)
|
return Objects.equals(timestamp, other.timestamp)
|
||||||
&& Objects.equals(tieBreaker, other.tieBreaker)
|
&& Objects.equals(tiebreaker, other.tiebreaker)
|
||||||
&& Objects.equals(children(), other.children())
|
&& Objects.equals(children(), other.children())
|
||||||
&& Objects.equals(keys, other.keys);
|
&& Objects.equals(keys, other.keys);
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,7 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
|
||||||
Expressions.asAttributes(s.until().keys()),
|
Expressions.asAttributes(s.until().keys()),
|
||||||
map(s.until().child()),
|
map(s.until().child()),
|
||||||
s.timestamp(),
|
s.timestamp(),
|
||||||
s.tieBreaker());
|
s.tiebreaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p instanceof LocalRelation) {
|
if (p instanceof LocalRelation) {
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
|
||||||
ParserParams params = new ParserParams(zoneId)
|
ParserParams params = new ParserParams(zoneId)
|
||||||
.fieldEventCategory(request.eventCategoryField())
|
.fieldEventCategory(request.eventCategoryField())
|
||||||
.fieldTimestamp(request.timestampField())
|
.fieldTimestamp(request.timestampField())
|
||||||
.fieldTieBreaker(request.tieBreakerField())
|
.fieldTiebreaker(request.tiebreakerField())
|
||||||
.implicitJoinKey(request.implicitJoinKeyField());
|
.implicitJoinKey(request.implicitJoinKeyField());
|
||||||
|
|
||||||
EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
|
EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
|
||||||
|
|
Loading…
Reference in New Issue