Remove the ES_TIMESTAMP constant (elastic/elasticsearch#523)

Original commit: elastic/x-pack-elasticsearch@04fa354619
This commit is contained in:
David Kyle 2016-12-12 09:28:42 +00:00 committed by GitHub
parent 30bbbf0f78
commit 45c8aeb0f2
7 changed files with 38 additions and 66 deletions

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import java.util.ArrayDeque;
import java.util.Arrays;
@ -60,7 +61,7 @@ abstract class ElasticsearchBatchedDocumentsIterator<T> implements BatchedDocume
@Override
public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
filterBuilder.timeRange(ElasticsearchMappings.ES_TIMESTAMP, startEpochMs, endEpochMs);
filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs);
return this;
}

View File

@ -66,14 +66,6 @@ public class ElasticsearchMappings {
static final String TYPE = "type";
static final String DYNAMIC = "dynamic";
/**
* Name of the field used to store the timestamp in Elasticsearch.
* Note the field name is different to {@link org.elasticsearch.xpack.prelert.job.results.Bucket#TIMESTAMP} used by the
* API Bucket Resource, and is chosen for consistency with the default field name used by
* Logstash and Kibana.
*/
static final String ES_TIMESTAMP = "timestamp";
/**
* Name of the Elasticsearch field by which documents are sorted by default
*/
@ -125,7 +117,7 @@ public class ElasticsearchMappings {
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ES_TIMESTAMP)
.startObject(Bucket.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.ANOMALY_SCORE.getPreferredName())
@ -448,7 +440,7 @@ public class ElasticsearchMappings {
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.startObject(Quantiles.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(Quantiles.QUANTILE_STATE.getPreferredName())
@ -539,7 +531,7 @@ public class ElasticsearchMappings {
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.startObject(ModelSnapshot.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
// "description" is analyzed so that it has the same
@ -565,17 +557,14 @@ public class ElasticsearchMappings {
addModelSizeStatsFieldsToMapping(builder);
builder.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.endObject()
builder.endObject()
.endObject()
.startObject(Quantiles.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.startObject(Quantiles.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(Quantiles.QUANTILE_STATE.getPreferredName())
@ -643,7 +632,7 @@ public class ElasticsearchMappings {
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.startObject(Usage.TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(Usage.INPUT_BYTES)
@ -665,7 +654,7 @@ public class ElasticsearchMappings {
.startObject()
.startObject(AuditMessage.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.startObject(AuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.endObject()
@ -678,7 +667,7 @@ public class ElasticsearchMappings {
.startObject()
.startObject(AuditActivity.TYPE.getPreferredName())
.startObject(PROPERTIES)
.startObject(ES_TIMESTAMP)
.startObject(AuditActivity.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.endObject()

View File

@ -71,7 +71,7 @@ public class JobDataDeleter {
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = JobResultsPersister.getJobIndexName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
timeRange.lt(new Date().getTime());

View File

@ -280,13 +280,13 @@ public class JobProvider {
public QueryPage<Bucket> buckets(String jobId, BucketsQuery query)
throws ResourceNotFoundException {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, query.getStart(), query.getEnd())
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.score(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbability())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
SortBuilder<?> sortBuilder = new FieldSortBuilder(esSortField(query.getSortField()))
SortBuilder<?> sortBuilder = new FieldSortBuilder(query.getSortField())
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
QueryPage<Bucket> buckets = buckets(jobId, query.isIncludeInterim(), query.getFrom(), query.getSize(), fb, sortBuilder);
@ -388,7 +388,7 @@ public class JobProvider {
SearchHits hits;
try {
LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName);
QueryBuilder matchQuery = QueryBuilders.matchQuery(ElasticsearchMappings.ES_TIMESTAMP, query.getTimestamp());
QueryBuilder matchQuery = QueryBuilders.matchQuery(Bucket.TIMESTAMP.getPreferredName(), query.getTimestamp());
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(matchQuery)
@ -458,7 +458,7 @@ public class JobProvider {
String partitionFieldValue)
throws ResourceNotFoundException {
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, epochStart, epochEnd)
.timeRange(Bucket.TIMESTAMP.getPreferredName(), epochStart, epochEnd)
.build();
QueryBuilder boolQuery = new BoolQueryBuilder()
@ -466,7 +466,7 @@ public class JobProvider {
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE))
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(ElasticsearchMappings.ES_TIMESTAMP).order(SortOrder.ASC);
FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
@ -566,7 +566,7 @@ public class JobProvider {
// the scenes, and Elasticsearch documentation claims it's significantly
// slower. Here we rely on the record timestamps being identical to the
// bucket timestamp.
QueryBuilder recordFilter = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp().getTime());
QueryBuilder recordFilter = QueryBuilders.termQuery(Bucket.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime());
recordFilter = new ResultsFilterBuilder(recordFilter)
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim)
@ -575,7 +575,7 @@ public class JobProvider {
FieldSortBuilder sb = null;
if (sortField != null) {
sb = new FieldSortBuilder(esSortField(sortField))
sb = new FieldSortBuilder(sortField)
.missing("_last")
.order(descending ? SortOrder.DESC : SortOrder.ASC);
}
@ -669,7 +669,7 @@ public class JobProvider {
public QueryPage<AnomalyRecord> records(String jobId, RecordsQueryBuilder.RecordsQuery query)
throws ResourceNotFoundException {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, query.getStart(), query.getEnd())
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreThreshold())
.score(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbabilityThreshold())
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
@ -685,7 +685,7 @@ public class JobProvider {
throws ResourceNotFoundException {
FieldSortBuilder sb = null;
if (sortField != null) {
sb = new FieldSortBuilder(esSortField(sortField))
sb = new FieldSortBuilder(sortField)
.missing("_last")
.order(descending ? SortOrder.DESC : SortOrder.ASC);
}
@ -714,7 +714,7 @@ public class JobProvider {
.setFetchSource(true); // the field option turns off source so request it explicitly
for (String sortField : secondarySort) {
searchBuilder.addSort(esSortField(sortField), descending ? SortOrder.DESC : SortOrder.ASC);
searchBuilder.addSort(sortField, descending ? SortOrder.DESC : SortOrder.ASC);
}
SearchResponse searchResponse;
@ -756,7 +756,7 @@ public class JobProvider {
*/
public QueryPage<Influencer> influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, query.getStart(), query.getEnd())
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
@ -771,7 +771,7 @@ public class JobProvider {
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
() -> (sortField != null) ?
" with sort " + (sortDescending ? "descending" : "ascending") + " on field " + esSortField(sortField) : "",
" with sort " + (sortDescending ? "descending" : "ascending") + " on field " + sortField : "",
() -> from, () -> size);
filterBuilder = new BoolQueryBuilder()
@ -784,7 +784,7 @@ public class JobProvider {
.setFrom(from).setSize(size);
FieldSortBuilder sb = sortField == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
: new FieldSortBuilder(esSortField(sortField)).order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
: new FieldSortBuilder(sortField).order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
searchRequestBuilder.addSort(sb);
SearchResponse response;
@ -913,12 +913,12 @@ public class JobProvider {
return modelSnapshots(jobId, from, size,
(sortField == null || sortField.isEmpty()) ? ModelSnapshot.RESTORE_PRIORITY.getPreferredName() : sortField,
sortDescending, fb.timeRange(
ElasticsearchMappings.ES_TIMESTAMP, startEpochMs, endEpochMs).build());
Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build());
}
private QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size,
String sortField, boolean sortDescending, QueryBuilder fb) {
FieldSortBuilder sb = new FieldSortBuilder(esSortField(sortField))
FieldSortBuilder sb = new FieldSortBuilder(sortField)
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
// Wrap in a constant_score because we always want to
@ -928,9 +928,9 @@ public class JobProvider {
SearchResponse searchResponse;
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + ModelSnapshot.TYPE +
" from index " + indexName + " sort ascending " + esSortField(sortField) +
" with filter after sort from " + from + " size " + size);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
searchResponse = client.prepareSearch(indexName)
.setTypes(ModelSnapshot.TYPE.getPreferredName())
.addSort(sb)
@ -945,19 +945,6 @@ public class JobProvider {
List<ModelSnapshot> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
// Remove the Kibana/Logstash '@timestamp' entry as stored in Elasticsearch,
// and replace using the API 'timestamp' key.
Object timestamp = hit.getSource().remove(ElasticsearchMappings.ES_TIMESTAMP);
hit.getSource().put(ModelSnapshot.TIMESTAMP.getPreferredName(), timestamp);
Object o = hit.getSource().get(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
if (o instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) o;
Object ts = map.remove(ElasticsearchMappings.ES_TIMESTAMP);
map.put(ModelSizeStats.TIMESTAMP_FIELD.getPreferredName(), ts);
}
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
@ -1116,11 +1103,4 @@ public class JobProvider {
public Auditor audit(String jobId) {
return new Auditor(client, JobResultsPersister.getJobIndexName(jobId), jobId);
}
private String esSortField(String sortField) {
// Beware: There's an assumption here that Bucket.TIMESTAMP,
// AnomalyRecord.TIMESTAMP, Influencer.TIMESTAMP and
// ModelSnapshot.TIMESTAMP are all the same
return sortField.equals(Bucket.TIMESTAMP.getPreferredName()) ? ElasticsearchMappings.ES_TIMESTAMP : sortField;
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
public class UsagePersister extends AbstractComponent {
@ -35,7 +36,7 @@ public class UsagePersister extends AbstractComponent {
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXX", Locale.ROOT);
upsertMap = new HashMap<>();
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, "");
upsertMap.put(Bucket.TIMESTAMP.getPreferredName(), "");
upsertMap.put(Usage.INPUT_BYTES, null);
}
@ -43,7 +44,7 @@ public class UsagePersister extends AbstractComponent {
ZonedDateTime nowTruncatedToHour = ZonedDateTime.now().truncatedTo(ChronoUnit.HOURS);
String formattedNowTruncatedToHour = nowTruncatedToHour.format(dateTimeFormatter);
String docId = USAGE_DOC_ID_PREFIX + formattedNowTruncatedToHour;
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, formattedNowTruncatedToHour);
upsertMap.put(Usage.TIMESTAMP, formattedNowTruncatedToHour);
// update global count
updateDocument(jobId, PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead);

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
public class ResultsFilterBuilderTests extends ESTestCase {
private static final String TIMESTAMP = "timestamp";
@ -27,11 +28,11 @@ public class ResultsFilterBuilderTests extends ESTestCase {
public void testBuild_GivenOnlyStartTime() {
QueryBuilder expected = QueryBuilders
.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP)
.rangeQuery(Bucket.TIMESTAMP.getPreferredName())
.gte(1000);
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, 1000, null)
.timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, null)
.build();
assertEquals(expected.toString(), fb.toString());
@ -122,7 +123,7 @@ public class ResultsFilterBuilderTests extends ESTestCase {
public void testBuild_GivenCombination() {
QueryBuilder originalFilter = QueryBuilders.existsQuery("someField");
QueryBuilder timeFilter = QueryBuilders
.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP)
.rangeQuery(Bucket.TIMESTAMP.getPreferredName())
.gte(1000)
.lt(2000);
QueryBuilder score1Filter = new ResultsFilterBuilder()
@ -143,7 +144,7 @@ public class ResultsFilterBuilderTests extends ESTestCase {
.filter(termFilter);
QueryBuilder fb = new ResultsFilterBuilder(originalFilter)
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, 1000, 2000)
.timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, 2000)
.score("score1", 50.0)
.score("score2", 80.0)
.interim("isInterim", false)

View File

@ -77,7 +77,7 @@ public class UsagePersisterTests extends ESTestCase {
List<Map> capturedUpserts = upsertsCaptor.getAllValues();
assertEquals(2, capturedUpserts.size());
assertEquals(timestamp, capturedUpserts.get(0).get(ElasticsearchMappings.ES_TIMESTAMP).toString());
assertEquals(timestamp, capturedUpserts.get(0).get(Usage.TIMESTAMP).toString());
assertEquals(10L, capturedUpserts.get(0).get(Usage.INPUT_BYTES));
assertEquals(30L, capturedUpserts.get(0).get(Usage.INPUT_FIELD_COUNT));
assertEquals(1L, capturedUpserts.get(0).get(Usage.INPUT_RECORD_COUNT));