Merge branch '7.x' of github.com:elastic/elasticsearch into 7.x

This commit is contained in:
Ed Savage 2019-05-21 19:14:17 +01:00
commit 685a206891
52 changed files with 490 additions and 305 deletions

View File

@ -3,7 +3,7 @@
include::../Versions.asciidoc[]
include::painless-getting-started.asciidoc[]
include::painless-guide.asciidoc[]
include::painless-lang-spec.asciidoc[]

View File

@ -54,6 +54,4 @@ specialized code may define new ways to use a Painless script.
| {xpack-ref}/transform-script.html[Elasticsearch Documentation]
|====
include::painless-contexts/painless-context-examples.asciidoc[]
include::painless-contexts/index.asciidoc[]

View File

@ -1,3 +1,5 @@
include::painless-context-examples.asciidoc[]
include::painless-ingest-processor-context.asciidoc[]
include::painless-update-context.asciidoc[]

View File

@ -1,11 +1,14 @@
[[painless-guide]]
== Painless Guide
_Painless_ is a simple, secure scripting language designed specifically for use
with Elasticsearch. It is the default scripting language for Elasticsearch and
can safely be used for inline and stored scripts. For a detailed description of
the Painless syntax and language features, see the
{painless}/painless-lang-spec.html[Painless Language Specification].
can safely be used for inline and stored scripts. For a jump start into
Painless, see <<painless-walkthrough, A Brief Painless Walkthrough>>. For a
detailed description of the Painless syntax and language features, see the
<<painless-lang-spec, Painless Language Specification>>.
[[painless-features]]
You can use Painless anywhere scripts can be used in Elasticsearch. Painless
You can use Painless anywhere scripts are used in Elasticsearch. Painless
provides:
* Fast performance: Painless scripts https://benchmarks.elastic.co/index.html#search_qps_scripts[
@ -18,7 +21,9 @@ complete list of available classes and methods.
* Optional typing: Variables and parameters can use explicit types or the
dynamic `def` type.
* Syntax: Extends Java's syntax to provide http://groovy-lang.org/index.html[
Groovy-style] scripting language features that make scripts easier to write.
* Syntax: Extends a subset of Java's syntax to provide additional scripting
language features.
* Optimizations: Designed specifically for Elasticsearch scripting.
include::painless-guide/index.asciidoc[]

View File

@ -0,0 +1,7 @@
include::painless-walkthrough.asciidoc[]
include::painless-method-dispatch.asciidoc[]
include::painless-debugging.asciidoc[]
include::painless-execute-script.asciidoc[]

View File

@ -0,0 +1,30 @@
[[modules-scripting-painless-dispatch]]
=== How painless dispatches functions
Painless uses receiver, name, and https://en.wikipedia.org/wiki/Arity[arity]
for method dispatch. For example, `s.foo(a, b)` is resolved by first getting
the class of `s` and then looking up the method `foo` with two parameters. This
is different from Groovy which uses the
https://en.wikipedia.org/wiki/Multiple_dispatch[runtime types] of the
parameters and Java which uses the compile time types of the parameters.
The consequence of this that Painless doesn't support overloaded methods like
Java, leading to some trouble when it whitelists classes from the Java
standard library. For example, in Java and Groovy, `Matcher` has two methods:
`group(int)` and `group(String)`. Painless can't whitelist both of these methods
because they have the same name and the same number of parameters. So instead it
has `group(int)` and `namedGroup(String)`.
We have a few justifications for this different way of dispatching methods:
1. It makes operating on `def` types simpler and, presumably, faster. Using
receiver, name, and arity means that when Painless sees a call on a `def` object it
can dispatch the appropriate method without having to do expensive comparisons
of the types of the parameters. The same is true for invocations with `def`
typed parameters.
2. It keeps things consistent. It would be genuinely weird for Painless to
behave like Groovy if any `def` typed parameters were involved and Java
otherwise. It'd be slow for it to behave like Groovy all the time.
3. It keeps Painless maintainable. Adding the Java or Groovy like method
dispatch *feels* like it'd add a ton of complexity which'd make maintenance and
other improvements much more difficult.

View File

@ -1,10 +1,5 @@
[[painless-getting-started]]
== Getting Started with Painless
include::painless-description.asciidoc[]
[[painless-examples]]
=== Painless Examples
[[painless-walkthrough]]
=== A Brief Painless Walkthrough
To illustrate how Painless works, let's load some hockey stats into an Elasticsearch index:
@ -121,7 +116,7 @@ GET hockey/_search
[float]
===== Missing values
==== Missing values
`doc['field'].value` throws an exception if
the field is missing in a document.
@ -198,7 +193,7 @@ POST hockey/_update/1
==== Dates
Date fields are exposed as
`ReadableDateTime`, so they support methods like `getYear`, `getDayOfWeek`
`ZonedDateTime`, so they support methods like `getYear`, `getDayOfWeek`
or e.g. getting milliseconds since epoch with `getMillis`. To use these
in a script, leave out the `get` prefix and continue with lowercasing the
rest of the method name. For example, the following returns every hockey
@ -365,38 +360,3 @@ Note: all of the `_update_by_query` examples above could really do with a
{ref}/query-dsl-script-query.html[script query] it wouldn't be as efficient
as using any other query because script queries aren't able to use the inverted
index to limit the documents that they have to check.
[[modules-scripting-painless-dispatch]]
=== How painless dispatches functions
Painless uses receiver, name, and https://en.wikipedia.org/wiki/Arity[arity]
for method dispatch. For example, `s.foo(a, b)` is resolved by first getting
the class of `s` and then looking up the method `foo` with two parameters. This
is different from Groovy which uses the
https://en.wikipedia.org/wiki/Multiple_dispatch[runtime types] of the
parameters and Java which uses the compile time types of the parameters.
The consequence of this that Painless doesn't support overloaded methods like
Java, leading to some trouble when it whitelists classes from the Java
standard library. For example, in Java and Groovy, `Matcher` has two methods:
`group(int)` and `group(String)`. Painless can't whitelist both of these methods
because they have the same name and the same number of parameters. So instead it
has `group(int)` and `namedGroup(String)`.
We have a few justifications for this different way of dispatching methods:
1. It makes operating on `def` types simpler and, presumably, faster. Using
receiver, name, and arity means that when Painless sees a call on a `def` object it
can dispatch the appropriate method without having to do expensive comparisons
of the types of the parameters. The same is true for invocations with `def`
typed parameters.
2. It keeps things consistent. It would be genuinely weird for Painless to
behave like Groovy if any `def` typed parameters were involved and Java
otherwise. It'd be slow for it to behave like Groovy all the time.
3. It keeps Painless maintainable. Adding the Java or Groovy like method
dispatch *feels* like it'd add a ton of complexity which'd make maintenance and
other improvements much more difficult.
include::painless-debugging.asciidoc[]
include::painless-execute-script.asciidoc[]

View File

@ -17,38 +17,4 @@ into Java Virtual Machine (JVM) byte code and executed against a standard JVM.
This specification uses ANTLR4 grammar notation to describe the allowed syntax.
However, the actual Painless grammar is more compact than what is shown here.
include::painless-comments.asciidoc[]
include::painless-keywords.asciidoc[]
include::painless-literals.asciidoc[]
include::painless-identifiers.asciidoc[]
include::painless-variables.asciidoc[]
include::painless-types.asciidoc[]
include::painless-casting.asciidoc[]
include::painless-operators.asciidoc[]
include::painless-operators-general.asciidoc[]
include::painless-operators-numeric.asciidoc[]
include::painless-operators-boolean.asciidoc[]
include::painless-operators-reference.asciidoc[]
include::painless-operators-array.asciidoc[]
include::painless-statements.asciidoc[]
include::painless-scripts.asciidoc[]
include::painless-functions.asciidoc[]
include::painless-lambdas.asciidoc[]
include::painless-regexes.asciidoc[]
include::painless-lang-spec/index.asciidoc[]

View File

@ -0,0 +1,35 @@
include::painless-comments.asciidoc[]
include::painless-keywords.asciidoc[]
include::painless-literals.asciidoc[]
include::painless-identifiers.asciidoc[]
include::painless-variables.asciidoc[]
include::painless-types.asciidoc[]
include::painless-casting.asciidoc[]
include::painless-operators.asciidoc[]
include::painless-operators-general.asciidoc[]
include::painless-operators-numeric.asciidoc[]
include::painless-operators-boolean.asciidoc[]
include::painless-operators-reference.asciidoc[]
include::painless-operators-array.asciidoc[]
include::painless-statements.asciidoc[]
include::painless-scripts.asciidoc[]
include::painless-functions.asciidoc[]
include::painless-lambdas.asciidoc[]
include::painless-regexes.asciidoc[]

View File

@ -1,2 +0,0 @@
Ready to start scripting with Painless? See {painless}/painless-getting-started.html[Getting Started with Painless] in the guide to the
{painless}/painless.html[Painless Scripting Language].

View File

@ -563,7 +563,7 @@ template for all indexes that hold data that needs pre-index processing.
[[conditionals-with-regex]]
=== Conditionals with the Regular Expressions
The `if` conditional is implemented as a Painless script, which requires
{painless}//painless-examples.html#modules-scripting-painless-regex[explicit support for regular expressions].
{painless}//painless-regexes.html[explicit support for regular expressions].
`script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regular
expressions in the `if` condition.

View File

@ -1,7 +1,32 @@
[[modules-scripting-painless]]
=== Painless Scripting Language
include::../../../painless/painless-description.asciidoc[]
_Painless_ is a simple, secure scripting language designed specifically for use
with Elasticsearch. It is the default scripting language for Elasticsearch and
can safely be used for inline and stored scripts. To get started with
Painless, see the {painless}/painless-guide.html[Painless Guide]. For a
detailed description of the Painless syntax and language features, see the
{painless}/painless-lang-spec.html[Painless Language Specification].
Ready to start scripting with Painless? See {painless}/painless-getting-started.html[Getting Started with Painless] in the guide to the
[[painless-features]]
You can use Painless anywhere scripts can be used in Elasticsearch. Painless
provides:
* Fast performance: Painless scripts https://benchmarks.elastic.co/index.html#search_qps_scripts[
run several times faster] than the alternatives.
* Safety: Fine-grained whitelist with method call/field granularity. See the
{painless}/painless-api-reference.html[Painless API Reference] for a
complete list of available classes and methods.
* Optional typing: Variables and parameters can use explicit types or the
dynamic `def` type.
* Syntax: Extends a subset of Java's syntax to provide additional scripting
language features.
* Optimizations: Designed specifically for Elasticsearch scripting.
Ready to start scripting with Painless? See the
{painless}/painless-guide.html[Painless Guide] for the
{painless}/index.html[Painless Scripting Language].

View File

@ -180,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;
/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;
/**
* Get all retention leases tracked on this shard.
*
@ -342,7 +354,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final Object retentionLeasePersistenceLock = new Object();
/**
* Persists the current retention leases to their dedicated state file.
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
@ -351,10 +364,16 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
}
}

View File

@ -70,13 +70,27 @@ public class RetentionLeases implements ToXContentFragment, Writeable {
/**
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
*
* @param that the retention leases collection to test against
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
*/
public boolean supersedes(final RetentionLeases that) {
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
boolean supersedes(final RetentionLeases that) {
return supersedes(that.primaryTerm, that.version);
}
/**
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
* terms its version is higher.
*
* @param primaryTerm the primary term
* @param version the version
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
* version
*/
boolean supersedes(final long primaryTerm, final long version) {
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
}
private final Map<String, RetentionLease> leases;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
@ -499,6 +500,49 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
}
public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
}
final Path path = createTempDir();
replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
}
/**
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
* This test can fail without the synchronization block in that method.

View File

@ -60,7 +60,9 @@ public class RetentionLeasesTests extends ESTestCase {
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}
public void testSupersedesByVersion() {
@ -70,7 +72,9 @@ public class RetentionLeasesTests extends ESTestCase {
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}
public void testRetentionLeasesRejectsDuplicates() {

View File

@ -109,15 +109,6 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
out.writeString(transformId);
}
/**
* Get the persisted stats document name from the Data Frame Transformer Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}
@Nullable
public String getTransformId() {
return transformId;

View File

@ -23,9 +23,9 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformProgress implements Writeable, ToXContentObject {
private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
private static final String PERCENT_COMPLETE = "percent_complete";
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final String PERCENT_COMPLETE = "percent_complete";
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",

View File

@ -42,12 +42,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private final String reason;
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -14,6 +15,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -22,7 +24,7 @@ import java.util.Objects;
public class DataFrameTransformStateAndStats implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_state_and_stats";
public static final String NAME = "data_frame_transform_state_and_stats";
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
@ -47,6 +49,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
}
@ -58,6 +64,15 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
DataFrameTransformCheckpointingInfo.EMPTY);
}
/**
* Get the persisted state and stats document name from the Data Frame Transform Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
@ -73,6 +88,11 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
}
@Nullable
public String getTransformId() {
return transformStats.getTransformId();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -80,6 +100,9 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.endObject();
return builder;
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.search.SearchResponse;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -94,16 +95,21 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});
if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

View File

@ -292,7 +292,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
wipeIndices();
}
public void wipeDataFrameTransforms() throws IOException, InterruptedException {
public void wipeDataFrameTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");

View File

@ -10,7 +10,7 @@ import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
@ -72,7 +72,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameIndexerTransformStats.NAME);
DataFrameTransformStateAndStats.NAME);
// Verify that we have our two stats documents
assertBusy(() -> {
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
@ -100,7 +100,6 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
expectedStats.merge(statName, statistic, Integer::sum);
}
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
@ -109,7 +108,8 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
for(String statName : PROVIDED_STATS) {
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
assertEquals("Incorrect stat " + statName,
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransfo
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
@ -176,6 +177,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
for(String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);
if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
} else {
@ -197,14 +199,15 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameIndexerTransformStats.NAME)));
DataFrameTransformStateAndStats.NAME)));
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setSize(0)
.setQuery(queryBuilder);
final String path = DataFrameField.STATS_FIELD.getPreferredName() + ".";
for(String statName : PROVIDED_STATS) {
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
}
ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
@ -213,6 +216,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
logger.error("statistics summations search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}
statsListener.onResponse(parseSearchAggs(searchResponse));
},
failure -> {

View File

@ -9,51 +9,29 @@ package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@ -69,18 +47,16 @@ public class TransportGetDataFrameTransformsStatsAction extends
private static final Logger logger = LogManager.getLogger(TransportGetDataFrameTransformsStatsAction.class);
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
@Inject
public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, Client client,
ClusterService clusterService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService) {
super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME);
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
}
@ -157,32 +133,14 @@ public class TransportGetDataFrameTransformsStatsAction extends
// Small assurance that we are at least below the max. Terms search has a hard limit of 10k, we should at least be below that.
assert transformsWithoutTasks.size() <= Request.MAX_SIZE_RETURN;
ActionListener<SearchResponse> searchStatsListener = ActionListener.wrap(
searchResponse -> {
List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures());
if (searchResponse.getShardFailures().length > 0) {
for(ShardSearchFailure shardSearchFailure : searchResponse.getShardFailures()) {
String nodeId = "";
if (shardSearchFailure.shard() != null) {
nodeId = shardSearchFailure.shard().getNodeId();
}
nodeFailures.add(new FailedNodeException(nodeId, shardSearchFailure.toString(), shardSearchFailure.getCause()));
}
logger.error("transform statistics document search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}
ActionListener<List<DataFrameTransformStateAndStats>> searchStatsListener = ActionListener.wrap(
stats -> {
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
for(SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try {
DataFrameIndexerTransformStats stats = parseFromSource(source);
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(stats.getTransformId(), stats));
transformsWithoutTasks.remove(stats.getTransformId());
} catch (IOException e) {
listener.onFailure(new ElasticsearchParseException("Could not parse data frame transform stats", e));
return;
}
}
allStateAndStats.addAll(stats);
transformsWithoutTasks.removeAll(
stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet()));
// Transforms that have not been started and have no state or stats.
transformsWithoutTasks.forEach(transformId ->
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(transformId)));
@ -190,7 +148,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), nodeFailures));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), response.getNodeFailures()));
},
e -> {
if (e instanceof IndexNotFoundException) {
@ -201,26 +159,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
}
);
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformsWithoutTasks))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameIndexerTransformStats.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
searchRequest,
searchStatsListener, client::search);
}
private static DataFrameIndexerTransformStats parseFromSource(BytesReference source) throws IOException {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameIndexerTransformStats.fromXContent(parser);
}
dataFrameTransformsConfigManager.getTransformStats(transformsWithoutTasks, searchStatsListener);
}
}

View File

@ -59,7 +59,7 @@ public class TransportStartDataFrameTransformTaskAction extends
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(listener);
transformTask.start(null, listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));

View File

@ -17,6 +17,9 @@ import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
@ -50,7 +53,7 @@ public final class DataFrameInternalIndex {
public static final String RAW = "raw";
// data types
public static final String DOUBLE = "double";
public static final String FLOAT = "float";
public static final String LONG = "long";
public static final String KEYWORD = "keyword";
@ -130,7 +133,7 @@ public final class DataFrameInternalIndex {
// add the schema for transform configurations
addDataFrameTransformsConfigMappings(builder);
// add the schema for transform stats
addDataFrameTransformsStatsMappings(builder);
addDataFrameTransformStateAndStatsMappings(builder);
// end type
builder.endObject();
// end properties
@ -141,37 +144,76 @@ public final class DataFrameInternalIndex {
}
private static XContentBuilder addDataFrameTransformsStatsMappings(XContentBuilder builder) throws IOException {
private static XContentBuilder addDataFrameTransformStateAndStatsMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameTransformStateAndStats.STATE_FIELD.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameTransformState.TASK_STATE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.INDEXER_STATE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.CURRENT_POSITION.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(DataFrameTransformState.CHECKPOINT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformState.REASON.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.PROGRESS.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformProgress.PERCENT_COMPLETE)
.field(TYPE, FLOAT)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameField.STATS_FIELD.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.endObject()
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameTransformStateAndStats.CHECKPOINTING_INFO_FIELD.getPreferredName())
.field(ENABLED, false)
.endObject();
}

View File

@ -44,13 +44,14 @@ import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -274,13 +275,13 @@ public class DataFrameTransformsConfigManager {
}));
}
public void putOrUpdateTransformStats(DataFrameIndexerTransformStats stats, ActionListener<Boolean> listener) {
public void putOrUpdateTransformStats(DataFrameTransformStateAndStats stats, ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameIndexerTransformStats.documentId(stats.getTransformId()))
.id(DataFrameTransformStateAndStats.documentId(stats.getTransformId()))
.source(source);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
@ -297,8 +298,8 @@ public class DataFrameTransformsConfigManager {
}
}
public void getTransformStats(String transformId, ActionListener<DataFrameIndexerTransformStats> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameIndexerTransformStats.documentId(transformId));
public void getTransformStats(String transformId, ActionListener<DataFrameTransformStateAndStats> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformStateAndStats.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists() == false) {
@ -310,7 +311,7 @@ public class DataFrameTransformsConfigManager {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameIndexerTransformStats.fromXContent(parser));
resultListener.onResponse(DataFrameTransformStateAndStats.fromXContent(parser));
} catch (Exception e) {
logger.error(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), e);
@ -326,6 +327,46 @@ public class DataFrameTransformsConfigManager {
}));
}
public void getTransformStats(Collection<String> transformIds, ActionListener<List<DataFrameTransformStateAndStats>> listener) {
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStateAndStats.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
stats.add(DataFrameTransformStateAndStats.fromXContent(parser));
} catch (IOException e) {
listener.onFailure(
new ElasticsearchParseException("failed to parse data frame stats from search hit", e));
return;
}
}
listener.onResponse(stats);
},
e -> {
if (e.getClass() == IndexNotFoundException.class) {
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(e);
}
}
), client::search);
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformConfig> transformListener) {
try (InputStream stream = source.streamInput();

View File

@ -26,10 +26,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@ -106,44 +106,47 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final DataFrameTransformState transformState = (DataFrameTransformState) state;
final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
new DataFrameTransformTask.ClientDataFrameIndexerBuilder()
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
.setAuditor(auditor)
.setClient(client)
.setIndexerState(currentIndexerState(transformState))
.setInitialPosition(transformState == null ? null : transformState.getPosition())
// If the state is `null` that means this is a "first run". We can safely assume the
// task will attempt to gather the initial progress information
// if we have state, this may indicate the previous execution node crashed, so we should attempt to retrieve
// the progress from state to keep an accurate measurement of our progress
.setProgress(transformState == null ? null : transformState.getProgress())
.setIndexerState(currentIndexerState(transformPTaskState))
// If the transform persistent task state is `null` that means this is a "first run".
// If we have state then the task has relocated from another node in which case this
// state is preferred
.setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition())
.setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress())
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager)
.setTransformId(transformId);
.setTransformsConfigManager(transformsConfigManager);
ActionListener<StartDataFrameTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
response -> logger.info("Successfully completed and scheduled task in node operation"),
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);
Long previousCheckpoint = transformPTaskState != null ? transformPTaskState.getCheckpoint() : null;
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<DataFrameIndexerTransformStats> transformStatsActionListener = ActionListener.wrap(
stats -> {
indexerBuilder.setInitialStats(stats);
buildTask.initializeIndexer(indexerBuilder);
startTask(buildTask, startTaskListener);
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
stateAndStats -> {
indexerBuilder.setInitialStats(stateAndStats.getTransformStats());
if (transformPTaskState == null) { // prefer the persistent task state
indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition());
indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress());
}
final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint();
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
buildTask.initializeIndexer(indexerBuilder);
startTask(buildTask, startTaskListener);
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
}
);
@ -217,13 +220,17 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
private void startTask(DataFrameTransformTask buildTask,
DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder,
Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task
buildTask.initializeIndexer(indexerBuilder);
// TODO isInitialRun is false after relocation??
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
buildTask.start(listener);
buildTask.start(previousCheckpoint, listener);
} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));

View File

@ -29,9 +29,11 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTask
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -181,7 +183,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return getIndexer() != null && getIndexer().initialRun();
}
public synchronized void start(ActionListener<Response> listener) {
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, ActionListener<Response> listener) {
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
@ -195,6 +203,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
stateReason.set(null);
taskState.set(DataFrameTransformTaskState.STARTED);
if (startingCheckpoint != null) {
currentCheckpoint.set(startingCheckpoint);
}
final DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STARTED,
@ -347,6 +358,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private Map<String, Object> initialPosition;
private DataFrameTransformProgress progress;
ClientDataFrameIndexerBuilder(String transformId) {
this.transformId = transformId;
this.initialStats = new DataFrameIndexerTransformStats(transformId);
}
ClientDataFrameIndexer build(DataFrameTransformTask parentTask) {
return new ClientDataFrameIndexer(this.transformId,
this.transformsConfigManager,
@ -538,7 +554,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
task -> {
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
transformsConfigManager.putOrUpdateTransformStats(getStats(),
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
previouslyPersistedStats = getStats();
@ -599,7 +617,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.shutdown();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
}
@Override

View File

@ -14,12 +14,17 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -217,4 +222,40 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
});
}
public void testStateAndStats() throws InterruptedException {
String transformId = "transform_test_stats_create_read_update";
DataFrameTransformStateAndStats stateAndStats =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(transformId);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(stateAndStats, listener), Boolean.TRUE, null, null);
assertAsync(listener -> transformsConfigManager.getTransformStats(transformId, listener), stateAndStats, null, null);
DataFrameTransformStateAndStats updated =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(transformId);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(updated, listener), Boolean.TRUE, null, null);
assertAsync(listener -> transformsConfigManager.getTransformStats(transformId, listener), updated, null, null);
}
public void testGetStateAndStatsMultiple() throws InterruptedException {
int numStats = randomInt(5);
List<DataFrameTransformStateAndStats> expectedStats = new ArrayList<>();
for (int i=0; i<numStats; i++) {
DataFrameTransformStateAndStats stat =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(randomAlphaOfLength(6));
expectedStats.add(stat);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(stat, listener), Boolean.TRUE, null, null);
}
// remove one of the put stats so we don't retrieve all
if (expectedStats.size() > 1) {
expectedStats.remove(expectedStats.size() -1);
}
List<String> ids = expectedStats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toList());
// get stats will be ordered by id
expectedStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
assertAsync(listener -> transformsConfigManager.getTransformStats(ids, listener), expectedStats, null, null);
}
}

View File

@ -114,8 +114,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
# - match: { transforms.0.state.indexer_state: "stopped" }
# - match: { transforms.0.state.task_state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -206,47 +206,3 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"
---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { started: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { stopped: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"