From fc9fb64ad5f9b16f9d3f8b88c5dbdc428ab5d1cd Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 3 Aug 2018 11:13:25 -0400 Subject: [PATCH] [Rollup] Improve ID scheme for rollup documents (#32558) Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly. This commit moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (`$`) are prepended to the ID. This gurantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that _if_ there is a collision, it stays "within" the job. BWC notes: We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or other state. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed. Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start --- .../docs/en/rest-api/rollup/get-job.asciidoc | 9 +- .../core/rollup/job/RollupJobStatus.java | 37 ++- .../job/JobWrapperSerializingTests.java | 2 +- .../core/rollup/job/RollupJobStatusTests.java | 4 +- .../elasticsearch/xpack/rollup/Rollup.java | 9 +- .../action/TransportRollupSearchAction.java | 4 +- .../xpack/rollup/job/IndexerUtils.java | 58 ++-- .../xpack/rollup/job/RollupIDGenerator.java | 178 +++++++++++ .../xpack/rollup/job/RollupIndexer.java | 17 +- .../xpack/rollup/job/RollupJobTask.java | 56 +++- .../action/PutJobStateMachineTests.java | 18 +- .../rollup/action/SearchActionTests.java | 3 +- .../xpack/rollup/job/IndexerUtilsTests.java | 119 ++++++- .../job/RollupIndexerIndexingTests.java | 32 +- .../rollup/job/RollupIndexerStateTests.java | 95 +++--- .../xpack/rollup/job/RollupJobTaskTests.java | 189 ++++++++--- .../rest-api-spec/test/rollup/delete_job.yml | 3 + .../rest-api-spec/test/rollup/get_jobs.yml | 3 + .../rest-api-spec/test/rollup/put_job.yml | 1 + .../xpack/restart/FullClusterRestartIT.java | 94 ++++++ .../upgrades/RollupIDUpgradeIT.java | 293 ++++++++++++++++++ 21 files changed, 1054 insertions(+), 170 deletions(-) create mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIDGenerator.java create mode 100644 x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RollupIDUpgradeIT.java diff --git a/x-pack/docs/en/rest-api/rollup/get-job.asciidoc b/x-pack/docs/en/rest-api/rollup/get-job.asciidoc index 7a7db9258b8..96053dbfea6 100644 --- a/x-pack/docs/en/rest-api/rollup/get-job.asciidoc +++ b/x-pack/docs/en/rest-api/rollup/get-job.asciidoc @@ -93,7 +93,8 @@ Which will yield the following response: "page_size" : 1000 }, "status" : { - "job_state" : "stopped" + "job_state" : "stopped", + "upgraded_doc_id": true }, "stats" : { "pages_processed" : 0, @@ -212,7 +213,8 @@ Which will yield the following response: "page_size" : 1000 }, "status" : { - "job_state" : "stopped" + "job_state" : "stopped", + "upgraded_doc_id": true }, "stats" : { "pages_processed" : 0, @@ -260,7 +262,8 @@ Which will yield the following response: "page_size" : 1000 }, "status" : { - "job_state" : "stopped" + "job_state" : "stopped", + "upgraded_doc_id": true }, "stats" : { "pages_processed" : 0, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java index 4cbd5a3b455..c48d75946be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.rollup.job; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,12 +40,19 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { @Nullable private final TreeMap currentPosition; + // Flag holds the state of the ID scheme, e.g. if it has been upgraded to the + // concatenation scheme. See #32372 for more details + private boolean upgradedDocumentID; + private static final ParseField STATE = new ParseField("job_state"); private static final ParseField CURRENT_POSITION = new ParseField("current_position"); + private static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new RollupJobStatus((IndexerState) args[0], (HashMap) args[1])); + args -> new RollupJobStatus((IndexerState) args[0], + (HashMap) args[1], + (Boolean)args[2])); static { PARSER.declareField(constructorArg(), p -> { @@ -62,16 +70,28 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + + // Optional to accommodate old versions of state + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), UPGRADED_DOC_ID); } - public RollupJobStatus(IndexerState state, @Nullable Map position) { + public RollupJobStatus(IndexerState state, @Nullable Map position, + @Nullable Boolean upgradedDocumentID) { this.state = state; this.currentPosition = position == null ? null : new TreeMap<>(position); + this.upgradedDocumentID = upgradedDocumentID != null ? upgradedDocumentID : false; //default to false if missing } public RollupJobStatus(StreamInput in) throws IOException { state = IndexerState.fromStream(in); currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null; + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport + upgradedDocumentID = in.readBoolean(); + } else { + // If we're getting this job from a pre-6.4.0 node, + // it is using the old ID scheme + upgradedDocumentID = false; + } } public IndexerState getIndexerState() { @@ -82,6 +102,10 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { return currentPosition; } + public boolean isUpgradedDocumentID() { + return upgradedDocumentID; + } + public static RollupJobStatus fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -97,6 +121,7 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { if (currentPosition != null) { builder.field(CURRENT_POSITION.getPreferredName(), currentPosition); } + builder.field(UPGRADED_DOC_ID.getPreferredName(), upgradedDocumentID); builder.endObject(); return builder; } @@ -113,6 +138,9 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { if (currentPosition != null) { out.writeMap(currentPosition); } + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport + out.writeBoolean(upgradedDocumentID); + } } @Override @@ -128,11 +156,12 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState { RollupJobStatus that = (RollupJobStatus) other; return Objects.equals(this.state, that.state) - && Objects.equals(this.currentPosition, that.currentPosition); + && Objects.equals(this.currentPosition, that.currentPosition) + && Objects.equals(this.upgradedDocumentID, that.upgradedDocumentID); } @Override public int hashCode() { - return Objects.hash(state, currentPosition); + return Objects.hash(state, currentPosition, upgradedDocumentID); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java index 5515aeaaf48..fa9767b51a3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java @@ -41,6 +41,6 @@ public class JobWrapperSerializingTests extends AbstractSerializingTestCase processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, - GroupConfig groupConfig, String jobId) { + GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); return agg.getBuckets().stream().map(b ->{ @@ -57,24 +58,30 @@ class IndexerUtils { TreeMap keys = new TreeMap<>(b.getKey()); List metrics = b.getAggregations().asList(); + RollupIDGenerator idGenerator; + if (isUpgradedDocID) { + idGenerator = new RollupIDGenerator.Murmur3(jobId); + } else { + idGenerator = new RollupIDGenerator.CRC(); + } Map doc = new HashMap<>(keys.size() + metrics.size()); - CRC32 docId = processKeys(keys, doc, b.getDocCount(), groupConfig); - byte[] vs = jobId.getBytes(StandardCharsets.UTF_8); - docId.update(vs, 0, vs.length); + + processKeys(keys, doc, b.getDocCount(), groupConfig, idGenerator); + idGenerator.add(jobId); processMetrics(metrics, doc); - doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD, Rollup.ROLLUP_VERSION); + doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD, + isUpgradedDocID ? Rollup.CURRENT_ROLLUP_VERSION : Rollup.ROLLUP_VERSION_V1); doc.put(RollupField.ROLLUP_META + "." + RollupField.ID.getPreferredName(), jobId); - IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, String.valueOf(docId.getValue())); + IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, idGenerator.getID()); request.source(doc); return request; }).collect(Collectors.toList()); } - private static CRC32 processKeys(Map keys, Map doc, long count, GroupConfig groupConfig) { - CRC32 docID = new CRC32(); - + private static void processKeys(Map keys, Map doc, + long count, GroupConfig groupConfig, RollupIDGenerator idGenerator) { keys.forEach((k, v) -> { // Also add a doc count for each key. This will duplicate data, but makes search easier later doc.put(k + "." + RollupField.COUNT_FIELD, count); @@ -83,37 +90,34 @@ class IndexerUtils { assert v != null; doc.put(k + "." + RollupField.TIMESTAMP, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval()); - doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone()); - docID.update(Numbers.longToBytes((Long)v), 0, 8); + doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString()); + idGenerator.add((Long)v); } else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval()); if (v == null) { - // Arbitrary value to update the doc ID with for nulls - docID.update(19); + idGenerator.addNull(); } else { - docID.update(Numbers.doubleToBytes((Double) v), 0, 8); + idGenerator.add((Double) v); } } else if (k.endsWith("." + TermsAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); if (v == null) { - // Arbitrary value to update the doc ID with for nulls - docID.update(19); + idGenerator.addNull(); } else if (v instanceof String) { - byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8); - docID.update(vs, 0, vs.length); + idGenerator.add((String)v); } else if (v instanceof Long) { - docID.update(Numbers.longToBytes((Long)v), 0, 8); + idGenerator.add((Long)v); } else if (v instanceof Double) { - docID.update(Numbers.doubleToBytes((Double)v), 0, 8); + idGenerator.add((Double)v); } else { - throw new RuntimeException("Encountered value of type [" + v.getClass() + "], which was unable to be processed."); + throw new RuntimeException("Encountered value of type [" + + v.getClass() + "], which was unable to be processed."); } } else { throw new ElasticsearchException("Could not identify key in agg [" + k + "]"); } }); - return docID; } private static void processMetrics(List metrics, Map doc) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIDGenerator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIDGenerator.java new file mode 100644 index 00000000000..3f9fc738739 --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIDGenerator.java @@ -0,0 +1,178 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.rollup.job; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.hash.MurmurHash3; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.zip.CRC32; + +/** + * The ID Generator creates a deterministic document ID to be used for rollup docs. + * It does this by accepting values (e.g. composite keys for the rollup bucket) and + * hashes those together in a deterministic manner. + * + * Once the ID has generated, the generator instance becomes locked and will not + * accept any more values. This is a safety mechanism to prevent accidentally + * changing the ID at runtime. + * + * NOTE: this class is not thread safe; there is no synchronization on the "generated" + * flag and it is unsafe to use from multiple threads. + */ +public abstract class RollupIDGenerator { + public abstract void add(Integer v); + public abstract void add(Long v); + public abstract void add(Double v); + public abstract void add(String v); + public abstract void addNull(); + public abstract String getID(); + + private boolean generated = false; + + final boolean check(Object v) { + if (generated) { + throw new RuntimeException("Cannot update ID as it has already been generated."); + } + if (v == null) { + addNull(); + return false; + } + return true; + } + + final void setFlag() { + if (generated) { + throw new RuntimeException("Cannot generate ID as it has already been generated."); + } + generated = true; + } + + /** + * The "old" style ID used in Rollup V1. A rolling 32 bit CRC. + * + * Null values are hashed as (int)19. + */ + @Deprecated + public static class CRC extends RollupIDGenerator { + private final CRC32 crc = new CRC32(); + + @Override + public void add(Integer v) { + if (check(v)) { + crc.update(v); + } + } + + @Override + public void add(Long v) { + if (check(v)) { + crc.update(Numbers.longToBytes(v), 0, 8); + } + } + + @Override + public void add(Double v) { + if (check(v)) { + crc.update(Numbers.doubleToBytes(v), 0, 8); + } + } + + @Override + public void add(String v) { + if (check(v)) { + byte[] vs = (v).getBytes(StandardCharsets.UTF_8); + crc.update(vs, 0, vs.length); + } + } + + @Override + public void addNull() { + // Old ID scheme used (int)19 as the null placeholder. + // Not great but we're stuck with it :( + crc.update(19); + } + + @Override + public String getID() { + setFlag(); + return String.valueOf(crc.getValue()); + } + } + + /** + * The "new" style ID, used in Rollup V2. A 128 bit Murmur3 hash of + * all the keys concatenated together, base64-encoded, then prepended + * with the job ID and a `$` delimiter + * + * Null values are hashed as a (hopefully) unique string `__NULL_PLACEHOLDER__830f1de2__` + */ + public static class Murmur3 extends RollupIDGenerator { + private static final long SEED = 19; + private static final BytesRef DELIM = new BytesRef("$"); + private static final BytesRef NULL_PLACEHOLDER = new BytesRef("__NULL_PLACEHOLDER__830f1de2__"); + private final BytesRefBuilder id = new BytesRefBuilder(); + private final String jobId; + + Murmur3(String jobId) { + this.jobId = jobId; + } + + @Override + public void add(Integer v) { + if (check(v)) { + update(Numbers.intToBytes(v)); + } + } + + @Override + public void add(Long v) { + if (check(v)) { + update(Numbers.longToBytes(v)); + } + } + + @Override + public void add(Double v) { + if (check(v)) { + update(Numbers.doubleToBytes(v)); + } + } + + @Override + public void add(String v) { + if (check(v)) { + update((v).getBytes(StandardCharsets.UTF_8)); + } + } + + @Override + public void addNull() { + // New ID scheme uses a (hopefully) unique placeholder for null + update(NULL_PLACEHOLDER.bytes); + } + + private void update(byte[] v) { + id.append(v, 0, v.length); + id.append(DELIM); + } + + @Override + public String getID() { + setFlag(); + MurmurHash3.Hash128 hasher + = MurmurHash3.hash128(id.bytes(), 0, id.length(), SEED, new MurmurHash3.Hash128()); + byte[] hashedBytes = new byte[16]; + System.arraycopy(Numbers.longToBytes(hasher.h1), 0, hashedBytes, 0, 8); + System.arraycopy(Numbers.longToBytes(hasher.h2), 0, hashedBytes, 8, 8); + return jobId + "$" + Base64.getUrlEncoder().withoutPadding().encodeToString(hashedBytes); + + } + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 4366f110b13..308def9c22f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -54,6 +55,7 @@ public abstract class RollupIndexer { private final AtomicReference state; private final AtomicReference> position; private final Executor executor; + protected final AtomicBoolean upgradedDocumentID; private final CompositeAggregationBuilder compositeBuilder; private long maxBoundary; @@ -65,13 +67,15 @@ public abstract class RollupIndexer { * @param initialState Initial state for the indexer * @param initialPosition The last indexed bucket of the task */ - RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition) { + RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition, AtomicBoolean upgradedDocumentID) { this.executor = executor; this.job = job; this.stats = new RollupJobStats(); this.state = initialState; this.position = new AtomicReference<>(initialPosition); this.compositeBuilder = createCompositeBuilder(job.getConfig()); + this.upgradedDocumentID = upgradedDocumentID; } /** @@ -140,6 +144,13 @@ public abstract class RollupIndexer { return stats; } + /** + * Returns if this job has upgraded it's ID scheme yet or not + */ + public boolean isUpgradedDocumentID() { + return upgradedDocumentID.get(); + } + /** * Sets the internal state to {@link IndexerState#STARTED} if the previous state was {@link IndexerState#STOPPED}. Setting the state to * STARTED allows a job to run in the background when {@link #maybeTriggerAsyncJob(long)} is called. @@ -312,8 +323,10 @@ public abstract class RollupIndexer { } final BulkRequest bulkRequest = new BulkRequest(); + // Indexer is single-threaded, and only place that the ID scheme can get upgraded is doSaveState(), so + // we can pass down the boolean value rather than the atomic here final List docs = IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), - stats, job.getConfig().getGroupConfig(), job.getConfig().getId()); + stats, job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()); docs.forEach(bulkRequest::add); assert bulkRequest.requests().size() > 0; doNextBulk(bulkRequest, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 50b3f21800d..65362f9ad9d 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.rollup.Rollup; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -97,8 +98,10 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE private final Client client; private final RollupJob job; - ClientRollupPageManager(RollupJob job, IndexerState initialState, Map initialPosition, Client client) { - super(threadPool.executor(ThreadPool.Names.GENERIC), job, new AtomicReference<>(initialState), initialPosition); + ClientRollupPageManager(RollupJob job, IndexerState initialState, Map initialPosition, + Client client, AtomicBoolean upgradedDocumentID) { + super(threadPool.executor(ThreadPool.Names.GENERIC), job, new AtomicReference<>(initialState), + initialPosition, upgradedDocumentID); this.client = client; this.job = job; } @@ -122,9 +125,16 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE next.run(); } else { // Otherwise, attempt to persist our state - final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition()); + // Upgrade to the new ID scheme while we are at it + boolean oldState = upgradedDocumentID.getAndSet(true); + final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition(), upgradedDocumentID.get()); logger.debug("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + indexerState.toString() + "]"); - updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> next.run())); + updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> { + // We failed to update the persistent task for some reason, + // set our flag back to what it was before + upgradedDocumentID.set(oldState); + next.run(); + })); } } @@ -148,6 +158,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final RollupIndexer indexer; + private AtomicBoolean upgradedDocumentID; RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { @@ -156,6 +167,9 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; + // We can assume the new ID scheme only for new jobs + this.upgradedDocumentID = new AtomicBoolean(true); + // If status is not null, we are resuming rather than starting fresh. Map initialPosition = null; IndexerState initialState = IndexerState.STOPPED; @@ -169,25 +183,35 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE * it is indexing but without the actual indexing thread running. */ initialState = IndexerState.STARTED; + } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) { // It shouldn't be possible to persist ABORTING, but if for some reason it does, // play it safe and restore the job as STOPPED. An admin will have to clean it up, // but it won't be running, and won't delete itself either. Safest option. // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok - // to restore as STOPEPD + // to restore as STOPPED initialState = IndexerState.STOPPED; } else { initialState = existingState; } initialPosition = state.getPosition(); + + // Since we have state, we are resuming a job/checkpoint. Although we are resuming + // from something that was checkpointed, we can't guarantee it was the _final_ checkpoint + // before the job ended (e.g. it could have been STOPPING, still indexing and killed, leaving + // us with an interval of time partially indexed). + // + // To be safe, if we are resuming any job, use it's ID upgrade status. It will only + // be true if it actually finished a full checkpoint. + this.upgradedDocumentID.set(state.isUpgradedDocumentID()); } this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, - new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId()))); + new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())), upgradedDocumentID); } @Override public Status getStatus() { - return new RollupJobStatus(indexer.getState(), indexer.getPosition()); + return new RollupJobStatus(indexer.getState(), indexer.getPosition(), upgradedDocumentID.get()); } /** @@ -223,13 +247,16 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE + " state was [" + prevState + "]")); return; } + final IndexerState newState = indexer.start(); if (newState != IndexerState.STARTED) { listener.onFailure(new ElasticsearchException("Cannot start task for Rollup Job [" + job.getConfig().getId() + "] because" + " state was [" + newState + "]")); return; } - final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition()); + + + final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition(), upgradedDocumentID.get()); logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" + state.getPosition() + "]"); updatePersistentTaskState(state, @@ -240,6 +267,8 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE listener.onResponse(new StartRollupJobAction.Response(true)); }, (exc) -> { + // We were unable to update the persistent status, so we need to shutdown the indexer too. + indexer.stop(); listener.onFailure( new ElasticsearchException("Error while updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "].", exc) @@ -261,6 +290,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE * @param listener The listener that is requesting the stop, so that we can signal completion/failure */ public synchronized void stop(ActionListener listener) { + final IndexerState newState = indexer.stop(); switch (newState) { case STOPPED: @@ -268,9 +298,13 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE break; case STOPPING: - // update the persistent state only if there is no background job running, - // otherwise the state is updated by the indexer when the background job detects the STOPPING state. - RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition()); + // update the persistent state to STOPPED. There are two scenarios and both are safe: + // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another + // STOPPED with the more recent position. That will also upgrade the ID scheme + // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up + // at last checkpoint, overwrite some docs and eventually checkpoint. At that time we'll also + // upgrade the ID scheme + RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition(), upgradedDocumentID.get()); updatePersistentTaskState(state, ActionListener.wrap( (task) -> { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 58fa9d4533b..d7bc2786646 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -181,7 +181,7 @@ public class PutJobStateMachineTests extends ESTestCase { fail("Listener success should not have been triggered."); }, e -> { assertThat(e.getMessage(), equalTo("Expected to find _meta key in mapping of rollup index [" - + job.getConfig().getRollupIndex() + "] but not found.")); + + job.getConfig().getRollupIndex() + "] but not found.")); }); Logger logger = mock(Logger.class); @@ -283,11 +283,11 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings("unchecked") public void testAddJobToMapping() { RollupJobConfig unrelatedJob = ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLength(10)) - .setIndexPattern("foo").setRollupIndex("rollup_index_foo").build(); + .setIndexPattern("foo").setRollupIndex("rollup_index_foo").build(); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo") - .setIndexPattern("foo") - .setRollupIndex("rollup_index_foo") - .build(), Collections.emptyMap()); + .setIndexPattern("foo") + .setRollupIndex("rollup_index_foo") + .build(), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { fail("Listener success should not have been triggered."); }, e -> { @@ -346,7 +346,7 @@ public class PutJobStateMachineTests extends ESTestCase { requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex())); return null; }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()), - eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); + eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); @@ -367,14 +367,14 @@ public class PutJobStateMachineTests extends ESTestCase { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { PersistentTasksCustomMetaData.PersistentTask response - = new PersistentTasksCustomMetaData.PersistentTask<>(job.getConfig().getId(), RollupField.TASK_NAME, job, 123, - mock(PersistentTasksCustomMetaData.Assignment.class)); + = new PersistentTasksCustomMetaData.PersistentTask<>(job.getConfig().getId(), RollupField.TASK_NAME, job, 123, + mock(PersistentTasksCustomMetaData.Assignment.class)); requestCaptor.getValue().onResponse(response); return null; }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); ArgumentCaptor requestCaptor2 - = ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskListener.class); + = ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskListener.class); doAnswer(invocation -> { // Bail here with an error, further testing will happen through tests of #startPersistentTask requestCaptor2.getValue().onFailure(new RuntimeException("Ending")); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java index 6843f957104..12c88823e2d 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java @@ -541,7 +541,8 @@ public class SearchActionTests extends ESTestCase { BoolQueryBuilder bool1 = new BoolQueryBuilder() .must(TransportRollupSearchAction.rewriteQuery(request.source().query(), caps)) .filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), "foo")) - .filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD), Rollup.ROLLUP_VERSION)); + .filter(new TermsQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD), + new long[]{Rollup.ROLLUP_VERSION_V1, Rollup.ROLLUP_VERSION_V2})); assertThat(msearch.requests().get(1).source().query(), equalTo(bool1)); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 872a4024647..51a53db713b 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; @@ -112,7 +113,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { directory.close(); List docs = IndexerUtils.processBuckets(composite, indexName, stats, - ConfigTestHelpers.getGroupConfig().build(), "foo"); + ConfigTestHelpers.getGroupConfig().build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -179,7 +180,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { directory.close(); List docs = IndexerUtils.processBuckets(composite, indexName, stats, - ConfigTestHelpers.getGroupConfig().build(), "foo"); + ConfigTestHelpers.getGroupConfig().build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -235,7 +236,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { directory.close(); List docs = IndexerUtils.processBuckets(composite, indexName, stats, - ConfigTestHelpers.getGroupConfig().build(), "foo"); + ConfigTestHelpers.getGroupConfig().build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -301,7 +302,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { directory.close(); List docs = IndexerUtils.processBuckets(composite, indexName, stats, - ConfigTestHelpers.getGroupConfig().build(), "foo"); + ConfigTestHelpers.getGroupConfig().build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -312,7 +313,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { } } - public void testKeyOrdering() { + public void testKeyOrderingOldID() { CompositeAggregation composite = mock(CompositeAggregation.class); when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { @@ -355,11 +356,112 @@ public class IndexerUtilsTests extends AggregatorTestCase { GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); groupConfig.setHisto(new HistogramGroupConfig(123L, "abc")); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig.build(), "foo"); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), + groupConfig.build(), "foo", false); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("1237859798")); } + public void testKeyOrderingNewID() { + CompositeAggregation composite = mock(CompositeAggregation.class); + + when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { + List foos = new ArrayList<>(); + + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + LinkedHashMap keys = new LinkedHashMap<>(3); + keys.put("foo.date_histogram", 123L); + keys.put("bar.terms", "baz"); + keys.put("abc.histogram", 1.9); + keys = shuffleMap(keys, Collections.emptySet()); + when(bucket.getKey()).thenReturn(keys); + + List list = new ArrayList<>(3); + InternalNumericMetricsAggregation.SingleValue mockAgg = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg.getName()).thenReturn("123"); + list.add(mockAgg); + + InternalNumericMetricsAggregation.SingleValue mockAgg2 = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg2.getName()).thenReturn("abc"); + list.add(mockAgg2); + + InternalNumericMetricsAggregation.SingleValue mockAgg3 = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg3.getName()).thenReturn("yay"); + list.add(mockAgg3); + + Collections.shuffle(list, random()); + + Aggregations aggs = new Aggregations(list); + when(bucket.getAggregations()).thenReturn(aggs); + when(bucket.getDocCount()).thenReturn(1L); + + foos.add(bucket); + + return foos; + }); + + GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); + groupConfig.setHisto(new HistogramGroupConfig(1, "abc")); + + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), + groupConfig.build(), "foo", true); + assertThat(docs.size(), equalTo(1)); + assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA")); + } + + /* + A test to make sure very long keys don't break the hash + */ + public void testKeyOrderingNewIDLong() { + CompositeAggregation composite = mock(CompositeAggregation.class); + + when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { + List foos = new ArrayList<>(); + + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + LinkedHashMap keys = new LinkedHashMap<>(3); + keys.put("foo.date_histogram", 123L); + + char[] charArray = new char[IndexWriter.MAX_TERM_LENGTH]; + Arrays.fill(charArray, 'a'); + keys.put("bar.terms", new String(charArray)); + keys.put("abc.histogram", 1.9); + keys = shuffleMap(keys, Collections.emptySet()); + when(bucket.getKey()).thenReturn(keys); + + List list = new ArrayList<>(3); + InternalNumericMetricsAggregation.SingleValue mockAgg = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg.getName()).thenReturn("123"); + list.add(mockAgg); + + InternalNumericMetricsAggregation.SingleValue mockAgg2 = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg2.getName()).thenReturn("abc"); + list.add(mockAgg2); + + InternalNumericMetricsAggregation.SingleValue mockAgg3 = mock(InternalNumericMetricsAggregation.SingleValue.class); + when(mockAgg3.getName()).thenReturn("yay"); + list.add(mockAgg3); + + Collections.shuffle(list, random()); + + Aggregations aggs = new Aggregations(list); + when(bucket.getAggregations()).thenReturn(aggs); + when(bucket.getDocCount()).thenReturn(1L); + + foos.add(bucket); + + return foos; + }); + + GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); + groupConfig.setHisto(new HistogramGroupConfig(1, "abc")); + + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), + groupConfig.build(), "foo", true); + assertThat(docs.size(), equalTo(1)); + assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw")); + } + public void testNullKeys() { CompositeAggregation composite = mock(CompositeAggregation.class); @@ -384,7 +486,8 @@ public class IndexerUtilsTests extends AggregatorTestCase { GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); groupConfig.setHisto(randomHistogramGroupConfig(random())); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig.build(), "foo"); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), + groupConfig.build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(1)); assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); } @@ -446,7 +549,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { directory.close(); List docs = IndexerUtils.processBuckets(composite, indexName, stats, - ConfigTestHelpers.getGroupConfig().build(), "foo"); + ConfigTestHelpers.getGroupConfig().build(), "foo", randomBoolean()); assertThat(docs.size(), equalTo(6)); for (IndexRequest doc : docs) { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index adb4241e36f..5799eb401f6 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -71,6 +71,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -81,6 +82,7 @@ import static org.hamcrest.number.OrderingComparison.greaterThan; public class RollupIndexerIndexingTests extends AggregatorTestCase { private QueryShardContext queryShardContext; private IndexSettings settings; + private final boolean newIDScheme = randomBoolean(); @Before private void setup() { @@ -111,7 +113,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", 3, "the_histo.date_histogram.interval", "1ms", "the_histo.date_histogram._count", 2, @@ -124,7 +126,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", 7, "the_histo.date_histogram.interval", "1ms", "the_histo.date_histogram._count", 1, @@ -170,7 +172,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T03:00:00"), "the_histo.date_histogram.interval", "1h", "the_histo.date_histogram._count", 3, @@ -188,7 +190,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T04:00:00"), "the_histo.date_histogram.interval", "1h", "the_histo.date_histogram._count", 3, @@ -206,7 +208,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T05:00:00"), "the_histo.date_histogram.interval", "1h", "the_histo.date_histogram._count", 4, @@ -224,7 +226,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T06:00:00"), "the_histo.date_histogram.interval", "1h", "the_histo.date_histogram._count", 3, @@ -242,7 +244,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T07:00:00"), "the_histo.date_histogram.interval", "1h", "the_histo.date_histogram._count", 3, @@ -289,7 +291,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueHours(5).getMillis()), "the_histo.date_histogram.interval", "1m", "the_histo.date_histogram._count", 2, @@ -302,7 +304,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueMinutes(75).getMillis()), "the_histo.date_histogram.interval", "1m", "the_histo.date_histogram._count", 2, @@ -315,7 +317,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueMinutes(61).getMillis()), "the_histo.date_histogram.interval", "1m", "the_histo.date_histogram._count", 1, @@ -355,7 +357,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T03:00:00"), "the_histo.date_histogram.interval", "1d", "the_histo.date_histogram._count", 2, @@ -374,7 +376,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-03-31T03:00:00"), "the_histo.date_histogram.interval", "1d", "the_histo.date_histogram._count", 2, @@ -387,7 +389,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { assertThat(request.type(), equalTo("_doc")); assertThat(request.sourceAsMap(), equalTo( asMap( - "_rollup.version", 1, + "_rollup.version", newIDScheme ? 2 : 1, "the_histo.date_histogram.timestamp", asLong("2015-04-01T03:00:00"), "the_histo.date_histogram.interval", "1d", "the_histo.date_histogram._count", 5, @@ -425,7 +427,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { Map source = ((IndexRequest) request).sourceAsMap(); - assertThat(source.get("_rollup.version"), equalTo(1)); + assertThat(source.get("_rollup.version"), equalTo(newIDScheme ? 2 : 1)); assertThat(source.get("ts.date_histogram.interval"), equalTo(timeInterval.toString())); assertNotNull(source.get("the_avg.avg._count")); assertNotNull(source.get("the_avg.avg.value")); @@ -580,7 +582,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { SyncRollupIndexer(Executor executor, RollupJob job, IndexSearcher searcher, MappedFieldType[] fieldTypes, MappedFieldType timestampField) { - super(executor, job, new AtomicReference<>(IndexerState.STARTED), null); + super(executor, job, new AtomicReference<>(IndexerState.STARTED), null, new AtomicBoolean(newIDScheme)); this.searcher = searcher; this.fieldTypes = fieldTypes; this.timestampField = timestampField; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 6669622b10c..c645a0e3005 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -20,11 +20,11 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJob; -import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.mockito.stubbing.Answer; @@ -49,14 +49,16 @@ import static org.mockito.Mockito.when; public class RollupIndexerStateTests extends ESTestCase { private static class EmptyRollupIndexer extends RollupIndexer { EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, - Map initialPosition) { - super(executor, job, initialState, initialPosition); + Map initialPosition, boolean upgraded) { + super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded)); } - EmptyRollupIndexer(Executor executor, RollupJob job, IndexerState initialState, Map initialPosition) { - this(executor, job, new AtomicReference<>(initialState), initialPosition); + EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition) { + this(executor, job, initialState, initialPosition, randomBoolean()); } + @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { // TODO Should use InternalComposite constructor but it is package protected in core. @@ -92,9 +94,9 @@ public class RollupIndexerStateTests extends ESTestCase { } })); final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), - aggs, null, false, null, null, 1); + aggs, null, false, null, null, 1); final SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, - new ShardSearchFailure[0], null); + new ShardSearchFailure[0], null); nextPhase.onResponse(response); } @@ -127,13 +129,13 @@ public class RollupIndexerStateTests extends ESTestCase { protected CountDownLatch latch; DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, - Map initialPosition) { - super(executor, job, initialState, initialPosition); + Map initialPosition, boolean upgraded) { + super(executor, job, initialState, initialPosition, upgraded); } - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, IndexerState initialState, + DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition) { - this(executor, job, new AtomicReference<>(initialState), initialPosition); + super(executor, job, initialState, initialPosition, randomBoolean()); } private CountDownLatch newLatch() { @@ -161,7 +163,7 @@ public class RollupIndexerStateTests extends ESTestCase { NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { - super(executor, job, initialState, initialPosition); + super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; @@ -206,7 +208,7 @@ public class RollupIndexerStateTests extends ESTestCase { @Override protected void onFailure(Exception exc) { - failureConsumer.accept(exc); + failureConsumer.accept(exc); } @Override @@ -215,11 +217,12 @@ public class RollupIndexerStateTests extends ESTestCase { public void testStarted() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { - RollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null); + RollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null, true); + assertTrue(indexer.isUpgradedDocumentID()); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -234,8 +237,8 @@ public class RollupIndexerStateTests extends ESTestCase { public void testIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { AtomicBoolean isFinished = new AtomicBoolean(false); @@ -302,7 +305,7 @@ public class RollupIndexerStateTests extends ESTestCase { public void testAbortDuringSearch() throws Exception { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); final CountDownLatch latch = new CountDownLatch(1); @@ -347,7 +350,7 @@ public class RollupIndexerStateTests extends ESTestCase { public void testAbortAfterCompletion() throws Exception { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); @@ -404,9 +407,9 @@ public class RollupIndexerStateTests extends ESTestCase { } })); final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), - aggs, null, false, null, null, 1); + aggs, null, false, null, null, 1); final SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, - ShardSearchFailure.EMPTY_ARRAY, null); + ShardSearchFailure.EMPTY_ARRAY, null); nextPhase.onResponse(response); } @@ -433,8 +436,8 @@ public class RollupIndexerStateTests extends ESTestCase { public void testStopIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null); @@ -456,8 +459,8 @@ public class RollupIndexerStateTests extends ESTestCase { public void testAbortIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { final AtomicBoolean isAborted = new AtomicBoolean(false); @@ -484,8 +487,8 @@ public class RollupIndexerStateTests extends ESTestCase { public void testAbortStarted() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { final AtomicBoolean isAborted = new AtomicBoolean(false); @@ -511,8 +514,8 @@ public class RollupIndexerStateTests extends ESTestCase { public void testMultipleJobTriggering() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); - IndexerState state = IndexerState.STOPPED; + Collections.emptyMap()); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { final AtomicBoolean isAborted = new AtomicBoolean(false); @@ -552,7 +555,7 @@ public class RollupIndexerStateTests extends ESTestCase { public void testUnknownKey() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); Function searchFunction = searchRequest -> { Aggregations aggs = new Aggregations(Collections.singletonList(new CompositeAggregation() { @@ -614,9 +617,9 @@ public class RollupIndexerStateTests extends ESTestCase { } })); final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), - aggs, null, false, null, null, 1); + aggs, null, false, null, null, 1); return new SearchResponse(sections, null, 1, 1, 0, 0, - ShardSearchFailure.EMPTY_ARRAY, null); + ShardSearchFailure.EMPTY_ARRAY, null); }; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); @@ -630,7 +633,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer); + searchFunction, bulkFunction, failureConsumer); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); @@ -657,7 +660,7 @@ public class RollupIndexerStateTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); Function searchFunction = searchRequest -> { Aggregations aggs = new Aggregations(Collections.singletonList(new CompositeAggregation() { @@ -720,9 +723,9 @@ public class RollupIndexerStateTests extends ESTestCase { } })); final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), - aggs, null, false, null, null, 1); + aggs, null, false, null, null, 1); return new SearchResponse(sections, null, 1, 1, 0, 0, - ShardSearchFailure.EMPTY_ARRAY, null); + ShardSearchFailure.EMPTY_ARRAY, null); }; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); @@ -736,7 +739,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer); + searchFunction, bulkFunction, failureConsumer); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); @@ -760,12 +763,12 @@ public class RollupIndexerStateTests extends ESTestCase { public void testSearchShardFailure() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); Function searchFunction = searchRequest -> { ShardSearchFailure[] failures = new ShardSearchFailure[]{new ShardSearchFailure(new RuntimeException("failed"))}; return new SearchResponse(null, null, 1, 1, 0, 0, - failures, null); + failures, null); }; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); @@ -779,7 +782,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer); + searchFunction, bulkFunction, failureConsumer); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); @@ -804,7 +807,7 @@ public class RollupIndexerStateTests extends ESTestCase { public void testBulkFailure() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(ESTestCase.randomAlphaOfLengthBetween(1, 10)).build(), - Collections.emptyMap()); + Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); Function searchFunction = searchRequest -> { Aggregations aggs = new Aggregations(Collections.singletonList(new CompositeAggregation() { @@ -866,9 +869,9 @@ public class RollupIndexerStateTests extends ESTestCase { } })); final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), - aggs, null, false, null, null, 1); + aggs, null, false, null, null, 1); return new SearchResponse(sections, null, 1, 1, 0, 0, - ShardSearchFailure.EMPTY_ARRAY, null); + ShardSearchFailure.EMPTY_ARRAY, null); }; Function bulkFunction = bulkRequest -> { @@ -885,7 +888,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer) { + searchFunction, bulkFunction, failureConsumer) { @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { nextPhase.onFailure(new RuntimeException("failed")); @@ -911,4 +914,4 @@ public class RollupIndexerStateTests extends ESTestCase { executor.shutdownNow(); } } -} \ No newline at end of file +} diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index ffcae267340..df7c12f47fa 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -59,12 +59,12 @@ public class RollupJobTaskTests extends ESTestCase { public void testInitialStatusStopped() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -72,12 +72,12 @@ public class RollupJobTaskTests extends ESTestCase { public void testInitialStatusAborting() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -85,12 +85,12 @@ public class RollupJobTaskTests extends ESTestCase { public void testInitialStatusStopping() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -98,28 +98,43 @@ public class RollupJobTaskTests extends ESTestCase { public void testInitialStatusStarted() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } - public void testInitialStatusIndexing() { + public void testInitialStatusIndexingOldID() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); + assertFalse(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); + } + + public void testInitialStatusIndexingNewID() { + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); + RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true); + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + status, client, schedulerEngine, pool, Collections.emptyMap()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); + assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); + assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testNoInitialStatus() { @@ -128,19 +143,20 @@ public class RollupJobTaskTests extends ESTestCase { when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()); + null, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testStartWhenStarted() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -155,7 +171,7 @@ public class RollupJobTaskTests extends ESTestCase { @Override public void onFailure(Exception e) { assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" - + job.getConfig().getId() + "] because state was [STARTED]")); + + job.getConfig().getId() + "] because state was [STARTED]")); latch.countDown(); } }); @@ -171,7 +187,7 @@ public class RollupJobTaskTests extends ESTestCase { AtomicInteger counter = new AtomicInteger(0); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -185,7 +201,7 @@ public class RollupJobTaskTests extends ESTestCase { fail("Should not have updated persistent statuse > 2 times"); } listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); counter.incrementAndGet(); } }; @@ -234,7 +250,7 @@ public class RollupJobTaskTests extends ESTestCase { @Override public void onFailure(Exception e) { assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" - + job.getConfig().getId() + "] because state was [STOPPING]")); + + job.getConfig().getId() + "] because state was [STOPPING]")); latch2.countDown(); } }); @@ -243,19 +259,19 @@ public class RollupJobTaskTests extends ESTestCase { public void testStartWhenStopped() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()) { + status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { assertThat(taskState, instanceOf(RollupJobStatus.class)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -281,19 +297,19 @@ public class RollupJobTaskTests extends ESTestCase { public void testTriggerUnrelated() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar")); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()) { + status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { assertThat(taskState, instanceOf(RollupJobStatus.class)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -327,14 +343,14 @@ public class RollupJobTaskTests extends ESTestCase { when(client.threadPool()).thenReturn(pool); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { assertThat(taskState, instanceOf(RollupJobStatus.class)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -395,7 +411,7 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -404,8 +420,12 @@ public class RollupJobTaskTests extends ESTestCase { assertThat(taskState, instanceOf(RollupJobStatus.class)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { + // When we get here, doSaveState() was just invoked so we will have + // have upgraded IDs + RollupJobStatus s = (RollupJobStatus)this.getStatus(); + assertTrue(s.isUpgradedDocumentID()); finished.set(true); } @@ -479,7 +499,7 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -488,8 +508,101 @@ public class RollupJobTaskTests extends ESTestCase { assertThat(taskState, instanceOf(RollupJobStatus.class)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { + // When we get here, doSaveState() was just invoked so we will have + // have upgraded IDs + RollupJobStatus s = (RollupJobStatus)this.getStatus(); + assertTrue(s.isUpgradedDocumentID()); + finished.set(true); + } + + } + }; + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); + assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + assertTrue(response.isStarted()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + started.set(true); + } + + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + ESTestCase.awaitBusy(started::get); + + task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); + assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + // Allow search response to return now + latch.countDown(); + + // Wait for the final persistent status to finish + ESTestCase.awaitBusy(finished::get); + } + + @SuppressWarnings("unchecked") + public void testSaveStateChangesIDScheme() throws InterruptedException { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Map headers = new HashMap<>(1); + headers.put("es-security-runas-user", "foo"); + headers.put("_xpack_security_authentication", "bar"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), headers); + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + AtomicBoolean started = new AtomicBoolean(false); + AtomicBoolean finished = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + doAnswer(invocationOnMock -> { + assertFalse(threadContext.getHeaders().isEmpty()); + assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo")); + assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar")); + + SearchResponse r = mock(SearchResponse.class); + when(r.getShardFailures()).thenReturn(ShardSearchFailure.EMPTY_ARRAY); + CompositeAggregation compositeAgg = mock(CompositeAggregation.class); + when(compositeAgg.getBuckets()).thenReturn(Collections.emptyList()); + when(compositeAgg.getName()).thenReturn(RollupField.NAME); + Aggregations aggs = new Aggregations(Collections.singletonList(compositeAgg)); + when(r.getAggregations()).thenReturn(aggs); + + // Wait before progressing + latch.await(); + + ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(r); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, false); + RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + status, client, schedulerEngine, pool, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + Integer counterValue = counter.getAndIncrement(); + if (counterValue == 0) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); + listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + } else if (counterValue == 1) { + // When we get here, doSaveState() was just invoked so we will have + // have upgraded IDs + RollupJobStatus s = (RollupJobStatus)this.getStatus(); + assertTrue(s.isUpgradedDocumentID()); finished.set(true); } @@ -525,12 +638,12 @@ public class RollupJobTaskTests extends ESTestCase { public void testStopWhenStopped() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()); + status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); CountDownLatch latch = new CountDownLatch(1); @@ -543,7 +656,7 @@ public class RollupJobTaskTests extends ESTestCase { @Override public void onFailure(Exception e) { - fail("Should not have entered onFailure"); + fail("Should not have entered onFailure"); } }); latch.await(3, TimeUnit.SECONDS); @@ -558,7 +671,7 @@ public class RollupJobTaskTests extends ESTestCase { AtomicInteger counter = new AtomicInteger(0); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -574,7 +687,7 @@ public class RollupJobTaskTests extends ESTestCase { fail("Should not have updated persistent statuse > 3 times"); } listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); + new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); counter.incrementAndGet(); } @@ -632,7 +745,7 @@ public class RollupJobTaskTests extends ESTestCase { public void testStopWhenAborting() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(), Collections.emptyMap()); - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null); + RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); @@ -644,7 +757,7 @@ public class RollupJobTaskTests extends ESTestCase { // just in case, we can override markAsCompleted so it's a no-op and test how stop // handles the situation RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - status, client, schedulerEngine, pool, Collections.emptyMap()) { + status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void markAsCompleted() { latch.countDown(); @@ -663,7 +776,7 @@ public class RollupJobTaskTests extends ESTestCase { @Override public void onFailure(Exception e) { assertThat(e.getMessage(), equalTo("Cannot stop task for Rollup Job [" - + job.getConfig().getId() + "] because state was [ABORTING]")); + + job.getConfig().getId() + "] because state was [ABORTING]")); latch.countDown(); } }); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index d172e27b2a3..298cf27fa2f 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -70,6 +70,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true - do: xpack.rollup.delete_job: @@ -116,6 +117,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true - do: xpack.rollup.delete_job: @@ -162,6 +164,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true - do: xpack.rollup.start_job: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml index fb2d9f59e34..f3fa8114ddb 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml @@ -71,6 +71,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true --- "Test get with no jobs": @@ -181,6 +182,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true - config: id: "bar" index_pattern: "bar" @@ -206,4 +208,5 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml index 98ef9b32e3d..516be25be2a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml @@ -71,6 +71,7 @@ setup: trigger_count: 0 status: job_state: "stopped" + upgraded_doc_id: true --- "Test put_job with existing name": diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index b91092fe9e7..d7ea3bf7729 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -32,6 +32,7 @@ import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Locale; @@ -41,6 +42,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -323,6 +325,98 @@ public class FullClusterRestartIT extends ESRestTestCase { } } + public void testRollupIDSchemeAfterRestart() throws Exception { + assumeTrue("Rollup can be tested with 6.3.0 and onwards", oldClusterVersion.onOrAfter(Version.V_6_3_0)); + if (runningAgainstOldCluster) { + + final Request indexRequest = new Request("POST", "/id-test-rollup/_doc/1"); + indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-01T00:00:01\",\"value\":123}"); + client().performRequest(indexRequest); + + // create the rollup job + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-id-test"); + createRollupJobRequest.setJsonEntity("{" + + "\"index_pattern\":\"id-test-rollup\"," + + "\"rollup_index\":\"id-test-results-rollup\"," + + "\"cron\":\"*/1 * * * * ?\"," + + "\"page_size\":100," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }," + + "\"histogram\":{" + + " \"fields\": [\"value\"]," + + " \"interval\":1" + + " }," + + "\"terms\":{" + + " \"fields\": [\"value\"]" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + Map createRollupJobResponse = entityAsMap(client().performRequest(createRollupJobRequest)); + assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // start the rollup job + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-id-test/_start"); + Map startRollupJobResponse = entityAsMap(client().performRequest(startRollupJobRequest)); + assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); + + assertRollUpJob("rollup-id-test"); + + assertBusy(() -> { + client().performRequest(new Request("POST", "id-test-results-rollup/_refresh")); + final Request searchRequest = new Request("GET", "id-test-results-rollup/_search"); + try { + Map searchResponse = entityAsMap(client().performRequest(searchRequest)); + assertNotNull(ObjectPath.eval("hits.total", searchResponse)); + assertThat(ObjectPath.eval("hits.total", searchResponse), equalTo(1)); + assertThat(ObjectPath.eval("hits.hits.0._id", searchResponse), equalTo("3310683722")); + } catch (IOException e) { + fail(); + } + }); + + } else { + + final Request indexRequest = new Request("POST", "/id-test-rollup/_doc/2"); + indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-02T00:00:01\",\"value\":345}"); + client().performRequest(indexRequest); + + assertRollUpJob("rollup-id-test"); + + assertBusy(() -> { + client().performRequest(new Request("POST", "id-test-results-rollup/_refresh")); + final Request searchRequest = new Request("GET", "id-test-results-rollup/_search"); + try { + Map searchResponse = entityAsMap(client().performRequest(searchRequest)); + assertNotNull(ObjectPath.eval("hits.total", searchResponse)); + assertThat(ObjectPath.eval("hits.total", searchResponse), equalTo(2)); + List ids = new ArrayList<>(2); + ids.add(ObjectPath.eval("hits.hits.0._id", searchResponse)); + ids.add(ObjectPath.eval("hits.hits.1._id", searchResponse)); + + // should have both old and new ID formats + assertThat(ids, containsInAnyOrder("3310683722", "rollup-id-test$ehY4NAyVSy8xxUDZrNXXIA")); + + List values = new ArrayList<>(2); + Map doc = ObjectPath.eval("hits.hits.0._source", searchResponse); + values.add((Double)doc.get("value.min.value")); + doc = ObjectPath.eval("hits.hits.1._source", searchResponse); + values.add((Double)doc.get("value.min.value")); + + assertThat(values, containsInAnyOrder(123.0, 345.0)); + } catch (IOException e) { + fail(); + } + }); + } + } + public void testSqlFailsOnIndexWithTwoTypes() throws IOException { // TODO this isn't going to trigger until we backport to 6.1 assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0", diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RollupIDUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RollupIDUpgradeIT.java new file mode 100644 index 00000000000..aae10978f14 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RollupIDUpgradeIT.java @@ -0,0 +1,293 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; +import org.hamcrest.Matcher; + +import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + + +public class RollupIDUpgradeIT extends AbstractUpgradeTestCase { + /** + * This test verifies that as a cluster is upgraded incrementally, new documents eventually switch + * over to the "new" form of ID (128 bit Murmur3 ids). + * + * Rollup IDs are essentially the hashed concatenation of keys returned by the composite aggregation, + * so the field values that are being indexed (timestamp, value, etc) directly affect the + * ID that is generated. + * + * We don't know which node will get the Rollup task to start, so we don't know when it will migrate. + * The first doc is guaranteed to be the "old" style since all nodes are un-upgraded. The second + * and third phase will have a mixed cluster, and the rollup task may or may not migrate. In those + * phases we have two options (old and new) for the document added in the phase. + * + * The last phase is guaranteed to be new as it's a fully upgraded cluster. + */ + public void testIDsUpgradeCorrectly() throws Exception { + switch (CLUSTER_TYPE) { + case OLD: + break; + case MIXED: + Request waitForYellow = new Request("GET", "/_cluster/health"); + waitForYellow.addParameter("wait_for_nodes", "3"); + waitForYellow.addParameter("wait_for_status", "yellow"); + client().performRequest(waitForYellow); + break; + case UPGRADED: + Request waitForGreen = new Request("GET", "/_cluster/health/target,rollup"); + waitForGreen.addParameter("wait_for_nodes", "3"); + waitForGreen.addParameter("wait_for_status", "green"); + // wait for long enough that we give delayed unassigned shards to stop being delayed + waitForGreen.addParameter("timeout", "70s"); + waitForGreen.addParameter("level", "shards"); + client().performRequest(waitForGreen); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + OffsetDateTime timestamp = Instant.parse("2018-01-01T00:00:01.000Z").atOffset(ZoneOffset.UTC); + + if (CLUSTER_TYPE == ClusterType.OLD) { + String recoverQuickly = "{\"settings\": {\"index.unassigned.node_left.delayed_timeout\": \"100ms\"}}"; + + Request createTargetIndex = new Request("PUT", "/target"); + createTargetIndex.setJsonEntity(recoverQuickly); + client().performRequest(createTargetIndex); + + final Request indexRequest = new Request("POST", "/target/_doc/1"); + indexRequest.setJsonEntity("{\"timestamp\":\"" + timestamp.toString() + "\",\"value\":123}"); + client().performRequest(indexRequest); + + // create the rollup job + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-id-test"); + createRollupJobRequest.setJsonEntity("{" + + "\"index_pattern\":\"target\"," + + "\"rollup_index\":\"rollup\"," + + "\"cron\":\"*/1 * * * * ?\"," + + "\"page_size\":100," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }," + + "\"histogram\":{" + + " \"fields\": [\"value\"]," + + " \"interval\":1" + + " }," + + "\"terms\":{" + + " \"fields\": [\"value\"]" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + Map createRollupJobResponse = entityAsMap(client().performRequest(createRollupJobRequest)); + assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + Request updateSettings = new Request("PUT", "/rollup/_settings"); + updateSettings.setJsonEntity(recoverQuickly); + client().performRequest(updateSettings); + + // start the rollup job + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-id-test/_start"); + Map startRollupJobResponse = entityAsMap(client().performRequest(startRollupJobRequest)); + assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); + + assertRollUpJob("rollup-id-test"); + List ids = getSearchResults(1); + assertThat(ids, containsInAnyOrder("3310683722")); + + } + + if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + final Request indexRequest = new Request("POST", "/target/_doc/2"); + indexRequest.setJsonEntity("{\"timestamp\":\"" + timestamp.plusDays(1).toString() + "\",\"value\":345}"); + client().performRequest(indexRequest); + + assertRollUpJob("rollup-id-test"); + client().performRequest(new Request("POST", "rollup/_refresh")); + + List ids = getSearchResults(2); + // first doc, guaranteed old style + ids.remove("3310683722"); + + // next doc may be either style + ids.removeAll(Arrays.asList("621059582", "rollup-id-test$ehY4NAyVSy8xxUDZrNXXIA")); + assertThat(ids.toString(),ids.size(), equalTo(0)); + } + + if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) { + final Request indexRequest = new Request("POST", "/target/_doc/3"); + indexRequest.setJsonEntity("{\"timestamp\":\"" + timestamp.plusDays(2).toString() + "\",\"value\":456}"); + client().performRequest(indexRequest); + + assertRollUpJob("rollup-id-test"); + client().performRequest(new Request("POST", "rollup/_refresh")); + + List ids = getSearchResults(3); + + // first doc, guaranteed old style + ids.remove("3310683722"); + + // next two docs may be either style + ids.removeAll(Arrays.asList("621059582", "4288019978", + "rollup-id-test$ehY4NAyVSy8xxUDZrNXXIA", "rollup-id-test$60RGDSb92YI5LH4_Fnq_1g")); + assertThat(ids.toString(), ids.size(), equalTo(0)); + + } + + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + final Request indexRequest = new Request("POST", "/target/_doc/4"); + indexRequest.setJsonEntity("{\"timestamp\":\"" + timestamp.plusDays(3).toString() + "\",\"value\":567}"); + client().performRequest(indexRequest); + + assertRollUpJob("rollup-id-test"); + client().performRequest(new Request("POST", "rollup/_refresh")); + + List ids = getSearchResults(4); + // first doc, guaranteed old style + ids.remove("3310683722"); + + // next two docs may be either style + ids.removeAll(Arrays.asList("621059582", "4288019978", + "rollup-id-test$ehY4NAyVSy8xxUDZrNXXIA", "rollup-id-test$60RGDSb92YI5LH4_Fnq_1g")); + + // Last is guaranteed to be new + ids.remove("rollup-id-test$LAKZftDeQwsUtdPixrkkzQ"); + assertThat(ids.toString(), ids.size(), equalTo(0)); + } + + } + + private List getSearchResults(int expectedCount) throws Exception { + final List collectedIDs = new ArrayList<>(); + assertBusy(() -> { + collectedIDs.clear(); + client().performRequest(new Request("POST", "rollup/_refresh")); + final Request searchRequest = new Request("GET", "rollup/_search"); + try { + Map searchResponse = entityAsMap(client().performRequest(searchRequest)); + assertNotNull(ObjectPath.eval("hits.total", searchResponse)); + assertThat(ObjectPath.eval("hits.total", searchResponse), equalTo(expectedCount)); + + for (int i = 0; i < expectedCount; i++) { + String id = ObjectPath.eval("hits.hits." + i + "._id", searchResponse); + collectedIDs.add(id); + Map doc = ObjectPath.eval("hits.hits." + i + "._source", searchResponse); + assertNotNull(doc); + if (id.startsWith("rollup-id-test")) { + assertThat(doc.get("_rollup.version"), equalTo(2)); + } else { + assertThat(doc.get("_rollup.version"), equalTo(1)); + } + } + } catch (IOException e) { + fail(); + } + }); + return collectedIDs; + } + + @SuppressWarnings("unchecked") + private void assertRollUpJob(final String rollupJob) throws Exception { + final Matcher expectedStates = anyOf(equalTo("indexing"), equalTo("started")); + waitForRollUpJob(rollupJob, expectedStates); + + // check that the rollup job is started using the RollUp API + final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); + Map getRollupJobResponse = entityAsMap(client().performRequest(getRollupJobRequest)); + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } + + // check that the rollup job is started using the Tasks API + final Request taskRequest = new Request("GET", "_tasks"); + taskRequest.addParameter("detailed", "true"); + taskRequest.addParameter("actions", "xpack/rollup/*"); + Map taskResponse = entityAsMap(client().performRequest(taskRequest)); + Map taskResponseNodes = (Map) taskResponse.get("nodes"); + Map taskResponseNode = (Map) taskResponseNodes.values().iterator().next(); + Map taskResponseTasks = (Map) taskResponseNode.get("tasks"); + Map taskResponseStatus = (Map) taskResponseTasks.values().iterator().next(); + assertThat(ObjectPath.eval("status.job_state", taskResponseStatus), expectedStates); + + // check that the rollup job is started using the Cluster State API + final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata"); + Map clusterStateResponse = entityAsMap(client().performRequest(clusterStateRequest)); + List> rollupJobTasks = ObjectPath.eval("metadata.persistent_tasks.tasks", clusterStateResponse); + + boolean hasRollupTask = false; + for (Map task : rollupJobTasks) { + if (ObjectPath.eval("id", task).equals(rollupJob)) { + hasRollupTask = true; + break; + } + } + if (hasRollupTask == false) { + fail("Expected persistent task for [" + rollupJob + "] but none found."); + } + + } + + private void waitForRollUpJob(final String rollupJob, final Matcher expectedStates) throws Exception { + assertBusy(() -> { + final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); + Response getRollupJobResponse = client().performRequest(getRollupJobRequest); + assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } + }, 30L, TimeUnit.SECONDS); + } + + private static Map getJob(Response response, String targetJobId) throws IOException { + return getJob(ESRestTestCase.entityAsMap(response), targetJobId); + } + + @SuppressWarnings("unchecked") + private static Map getJob(Map jobsMap, String targetJobId) throws IOException { + + List> jobs = + (List>) XContentMapValues.extractValue("jobs", jobsMap); + + if (jobs == null) { + return null; + } + + for (Map job : jobs) { + String jobId = (String) ((Map) job.get("config")).get("id"); + if (jobId.equals(targetJobId)) { + return job; + } + } + return null; + } +}