[7.x][Transform] add throttling (#56007) (#56184)

add throttling to transform, throttling will slow down search requests by
delaying the execution based on a documents per second metric.

fixes #54862
This commit is contained in:
Hendrik Muhs 2020-05-05 13:09:02 +02:00 committed by GitHub
parent f569405fde
commit e177a38504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 2055 additions and 532 deletions

View File

@ -0,0 +1,150 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class SettingsConfig implements ToXContentObject {
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
"settings_config",
true,
args -> new SettingsConfig((Integer) args[0], (Float) args[1])
);
static {
PARSER.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, MAX_PAGE_SEARCH_SIZE);
PARSER.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, DOCS_PER_SECOND);
}
public static SettingsConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (maxPageSearchSize != null) {
if (maxPageSearchSize.equals(DEFAULT_MAX_PAGE_SEARCH_SIZE)) {
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), (Integer) null);
} else {
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
}
if (docsPerSecond != null) {
if (docsPerSecond.equals(DEFAULT_DOCS_PER_SECOND)) {
builder.field(DOCS_PER_SECOND.getPreferredName(), (Float) null);
} else {
builder.field(DOCS_PER_SECOND.getPreferredName(), docsPerSecond);
}
}
builder.endObject();
return builder;
}
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}
public Float getDocsPerSecond() {
return docsPerSecond;
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other == null || other.getClass() != getClass()) {
return false;
}
SettingsConfig that = (SettingsConfig) other;
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond);
}
@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private Integer maxPageSearchSize;
private Float docsPerSecond;
/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
* pulling the data from the source index.
*
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
*
* @param maxPageSearchSize Integer value between 10 and 10_000
* @return the {@link Builder} with the paging maxPageSearchSize set.
*/
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
this.maxPageSearchSize = maxPageSearchSize == null ? DEFAULT_MAX_PAGE_SEARCH_SIZE : maxPageSearchSize;
return this;
}
/**
* Sets the docs per second that transform can use when pulling the data from the source index.
*
* This setting throttles transform by issuing queries less often, however processing still happens in
* batches. A value of 0 disables throttling (default).
*
* @param docsPerSecond Integer value
* @return the {@link Builder} with requestsPerSecond set.
*/
public Builder setRequestsPerSecond(Float docsPerSecond) {
this.docsPerSecond = docsPerSecond == null ? DEFAULT_DOCS_PER_SECOND : docsPerSecond;
return this;
}
public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond);
}
}
}

View File

@ -48,6 +48,7 @@ public class TransformConfig implements ToXContentObject {
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
// types of transforms
@ -58,13 +59,15 @@ public class TransformConfig implements ToXContentObject {
private final DestConfig dest;
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final SettingsConfig settings;
private final PivotConfig pivotConfig;
private final String description;
private final Version transformVersion;
private final Instant createTime;
public static final ConstructingObjectParser<TransformConfig, Void> PARSER =
new ConstructingObjectParser<>("transform", true,
public static final ConstructingObjectParser<TransformConfig, Void> PARSER = new ConstructingObjectParser<>(
"transform",
true,
(args) -> {
String id = (String) args[0];
SourceConfig source = (SourceConfig) args[1];
@ -73,30 +76,44 @@ public class TransformConfig implements ToXContentObject {
SyncConfig syncConfig = (SyncConfig) args[4];
PivotConfig pivotConfig = (PivotConfig) args[5];
String description = (String) args[6];
Instant createTime = (Instant)args[7];
String transformVersion = (String)args[8];
return new TransformConfig(id,
SettingsConfig settings = (SettingsConfig) args[7];
Instant createTime = (Instant) args[8];
String transformVersion = (String) args[9];
return new TransformConfig(
id,
source,
dest,
frequency,
syncConfig,
pivotConfig,
description,
settings,
createTime,
transformVersion);
});
transformVersion
);
}
);
static {
PARSER.declareString(constructorArg(), ID);
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
FREQUENCY, ObjectParser.ValueType.STRING);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
FREQUENCY,
ObjectParser.ValueType.STRING
);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
PARSER.declareField(optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()),
CREATE_TIME,
ObjectParser.ValueType.VALUE
);
PARSER.declareString(optionalConstructorArg(), VERSION);
}
@ -108,7 +125,6 @@ public class TransformConfig implements ToXContentObject {
return syncConfig;
}
public static TransformConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
@ -125,18 +141,21 @@ public class TransformConfig implements ToXContentObject {
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null);
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null);
}
TransformConfig(final String id,
TransformConfig(
final String id,
final SourceConfig source,
final DestConfig dest,
final TimeValue frequency,
final SyncConfig syncConfig,
final PivotConfig pivotConfig,
final String description,
final SettingsConfig settings,
final Instant createTime,
final String version) {
final String version
) {
this.id = id;
this.source = source;
this.dest = dest;
@ -144,6 +163,7 @@ public class TransformConfig implements ToXContentObject {
this.syncConfig = syncConfig;
this.pivotConfig = pivotConfig;
this.description = description;
this.settings = settings;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.transformVersion = version == null ? null : Version.fromString(version);
}
@ -185,6 +205,11 @@ public class TransformConfig implements ToXContentObject {
return description;
}
@Nullable
public SettingsConfig getSettings() {
return settings;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
@ -211,6 +236,9 @@ public class TransformConfig implements ToXContentObject {
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
if (settings != null) {
builder.field(SETTINGS.getPreferredName(), settings);
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
}
@ -240,13 +268,14 @@ public class TransformConfig implements ToXContentObject {
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.transformVersion, that.transformVersion)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.pivotConfig, that.pivotConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description);
return Objects.hash(id, source, dest, frequency, syncConfig, settings, createTime, transformVersion, pivotConfig, description);
}
@Override
@ -266,6 +295,7 @@ public class TransformConfig implements ToXContentObject {
private TimeValue frequency;
private SyncConfig syncConfig;
private PivotConfig pivotConfig;
private SettingsConfig settings;
private String description;
public Builder setId(String id) {
@ -303,8 +333,13 @@ public class TransformConfig implements ToXContentObject {
return this;
}
public Builder setSettings(SettingsConfig settings) {
this.settings = settings;
return this;
}
public TransformConfig build() {
return new TransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null);
return new TransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, settings, null, null);
}
}
}

View File

@ -39,18 +39,21 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class TransformConfigUpdate implements ToXContentObject {
public static final String NAME = "transform_config_update";
private static final ConstructingObjectParser<TransformConfigUpdate, String> PARSER = new ConstructingObjectParser<>(NAME,
private static final ConstructingObjectParser<TransformConfigUpdate, String> PARSER = new ConstructingObjectParser<>(
NAME,
false,
(args) -> {
SourceConfig source = (SourceConfig) args[0];
DestConfig dest = (DestConfig) args[1];
TimeValue frequency = args[2] == null ?
null :
TimeValue.parseTimeValue((String) args[2], TransformConfig.FREQUENCY.getPreferredName());
TimeValue frequency = args[2] == null
? null
: TimeValue.parseTimeValue((String) args[2], TransformConfig.FREQUENCY.getPreferredName());
SyncConfig syncConfig = (SyncConfig) args[3];
String description = (String) args[4];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description);
});
SettingsConfig settings = (SettingsConfig) args[5];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings);
}
);
static {
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), TransformConfig.SOURCE);
@ -58,6 +61,7 @@ public class TransformConfigUpdate implements ToXContentObject {
PARSER.declareString(optionalConstructorArg(), TransformConfig.FREQUENCY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), TransformConfig.SYNC);
PARSER.declareString(optionalConstructorArg(), TransformConfig.DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), TransformConfig.SETTINGS);
}
private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException {
@ -73,17 +77,22 @@ public class TransformConfigUpdate implements ToXContentObject {
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final String description;
private final SettingsConfig settings;
public TransformConfigUpdate(final SourceConfig source,
public TransformConfigUpdate(
final SourceConfig source,
final DestConfig dest,
final TimeValue frequency,
final SyncConfig syncConfig,
final String description) {
final String description,
final SettingsConfig settings
) {
this.source = source;
this.dest = dest;
this.frequency = frequency;
this.syncConfig = syncConfig;
this.description = description;
this.settings = settings;
}
public SourceConfig getSource() {
@ -107,6 +116,11 @@ public class TransformConfigUpdate implements ToXContentObject {
return description;
}
@Nullable
public SettingsConfig getSettings() {
return settings;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
@ -127,6 +141,10 @@ public class TransformConfigUpdate implements ToXContentObject {
if (description != null) {
builder.field(TransformConfig.DESCRIPTION.getPreferredName(), description);
}
if (settings != null) {
builder.field(TransformConfig.SETTINGS.getPreferredName(), settings);
}
builder.endObject();
return builder;
}
@ -147,12 +165,13 @@ public class TransformConfigUpdate implements ToXContentObject {
&& Objects.equals(this.dest, that.dest)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.description, that.description);
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings);
}
@Override
public int hashCode() {
return Objects.hash(source, dest, frequency, syncConfig, description);
return Objects.hash(source, dest, frequency, syncConfig, description, settings);
}
@Override
@ -175,6 +194,7 @@ public class TransformConfigUpdate implements ToXContentObject {
private TimeValue frequency;
private SyncConfig syncConfig;
private String description;
private SettingsConfig settings;
public Builder setSource(SourceConfig source) {
this.source = source;
@ -201,8 +221,13 @@ public class TransformConfigUpdate implements ToXContentObject {
return this;
}
public Builder setSettings(SettingsConfig settings) {
this.settings = settings;
return this;
}
public TransformConfigUpdate build() {
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description);
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings);
}
}
}

View File

@ -45,8 +45,11 @@ public class PivotConfig implements ToXContentObject {
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;
private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));
private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>(
"pivot_config",
true,
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2])
);
static {
PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
@ -84,6 +87,7 @@ public class PivotConfig implements ToXContentObject {
return groups;
}
@Deprecated
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}
@ -154,10 +158,11 @@ public class PivotConfig implements ToXContentObject {
* pulling the data from the source index.
*
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
*
* Deprecated, use {@link org.elasticsearch.client.transform.transforms.SettingsConfig.Builder#setMaxPageSearchSize} instead
* @param maxPageSearchSize Integer value between 10 and 10_000
* @return the {@link Builder} with the paging maxPageSearchSize set.
*/
@Deprecated
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
this.maxPageSearchSize = maxPageSearchSize;
return this;

View File

@ -45,6 +45,7 @@ import org.elasticsearch.client.transform.UpdateTransformResponse;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.NodeAttributes;
import org.elasticsearch.client.transform.transforms.QueryConfig;
import org.elasticsearch.client.transform.transforms.SettingsConfig;
import org.elasticsearch.client.transform.transforms.SourceConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
@ -80,13 +81,12 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
@After
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().transform().stopTransform(
new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
highLevelClient().transform()
.stopTransform(new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {
highLevelClient().transform().deleteTransform(
new DeleteTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().transform().deleteTransform(new DeleteTransformRequest(transformId), RequestOptions.DEFAULT);
}
transformsToClean = new ArrayList<>();
@ -151,9 +151,13 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
PivotConfig pivotConfig = PivotConfig.builder()
.setGroups(groupConfig) // <1>
.setAggregationConfig(aggConfig) // <2>
.setMaxPageSearchSize(1000) // <3>
.build();
// end::put-transform-pivot-config
// tag::put-transform-settings-config
SettingsConfig settings = SettingsConfig.builder()
.setMaxPageSearchSize(1000) // <1>
.build();
// end::put-transform-settings-config
// tag::put-transform-config
TransformConfig transformConfig = TransformConfig
.builder()
@ -163,6 +167,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
.setFrequency(TimeValue.timeValueSeconds(15)) // <4>
.setPivotConfig(pivotConfig) // <5>
.setDescription("This is my test transform") // <6>
.setSettings(settings) // <7>
.build();
// end::put-transform-config
@ -225,8 +230,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
@ -279,8 +283,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
assertThat(updatedConfig.getDescription(), equalTo("This is my updated transform"));
}
{
UpdateTransformRequest request = new UpdateTransformRequest(update,
"my-transform-to-update");
UpdateTransformRequest request = new UpdateTransformRequest(update, "my-transform-to-update");
// tag::update-transform-execute-listener
ActionListener<UpdateTransformResponse> listener =
@ -316,8 +319,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
@ -436,8 +438,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
@ -445,19 +446,13 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
TransformConfig transformConfig1 = TransformConfig.builder()
.setId("mega-transform")
.setSource(SourceConfig.builder()
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setSource(SourceConfig.builder().setIndex("source-data").setQuery(new MatchAllQueryBuilder()).build())
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
TransformConfig transformConfig2 = TransformConfig.builder()
.setId("mega-transform2")
.setSource(SourceConfig.builder()
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setSource(SourceConfig.builder().setIndex("source-data").setQuery(new MatchAllQueryBuilder()).build())
.setDest(DestConfig.builder().setIndex("pivot-dest2").build())
.setPivotConfig(pivotConfig)
.build();
@ -517,8 +512,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
@ -581,8 +575,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
@ -591,10 +584,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
String id = "statisitcal-transform";
TransformConfig transformConfig = TransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder()
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setSource(SourceConfig.builder().setIndex("source-data").setQuery(new MatchAllQueryBuilder()).build())
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
@ -668,24 +658,18 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testGetDataFrameTransform() throws IOException, InterruptedException {
createIndex("source-data");
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
TermsGroupSource.builder().setField("user_id").build()).build();
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregationConfig(aggConfig).build();
TransformConfig putTransformConfig = TransformConfig.builder()
.setId("mega-transform")
.setSource(SourceConfig.builder()
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setSource(SourceConfig.builder().setIndex("source-data").setQuery(new MatchAllQueryBuilder()).build())
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig> {
public static SettingsConfig randomSettingsConfig() {
return new SettingsConfig(randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat());
}
@Override
protected SettingsConfig createTestInstance() {
return randomSettingsConfig();
}
@Override
protected SettingsConfig doParseInstance(XContentParser parser) throws IOException {
return SettingsConfig.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
public void testExplicitNullOnWriteParser() throws IOException {
// test that an explicit null is handled differently than not set
SettingsConfig config = fromString("{\"max_page_search_size\" : null}");
assertThat(config.getMaxPageSearchSize(), equalTo(-1));
Map<String, Object> settingsAsMap = xContentToMap(config);
assertNull(settingsAsMap.getOrDefault("max_page_search_size", "not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
SettingsConfig emptyConfig = fromString("{}");
assertNull(emptyConfig.getMaxPageSearchSize());
settingsAsMap = xContentToMap(emptyConfig);
assertTrue(settingsAsMap.isEmpty());
config = fromString("{\"docs_per_second\" : null}");
assertThat(config.getDocsPerSecond(), equalTo(-1F));
settingsAsMap = xContentToMap(config);
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
}
public void testExplicitNullOnWriteBuilder() throws IOException {
// test that an explicit null is handled differently than not set
SettingsConfig config = new SettingsConfig.Builder().setMaxPageSearchSize(null).build();
assertThat(config.getMaxPageSearchSize(), equalTo(-1));
Map<String, Object> settingsAsMap = xContentToMap(config);
assertNull(settingsAsMap.getOrDefault("max_page_search_size", "not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
settingsAsMap = xContentToMap(emptyConfig);
assertTrue(settingsAsMap.isEmpty());
config = new SettingsConfig.Builder().setRequestsPerSecond(null).build();
assertThat(config.getDocsPerSecond(), equalTo(-1F));
settingsAsMap = xContentToMap(config);
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
}
private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
xcontent.toXContent(builder, XContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
return parser.map();
}
private SettingsConfig fromString(String source) throws IOException {
try (XContentParser parser = createParser(JsonXContent.jsonXContent, source)) {
return SettingsConfig.fromXContent(parser);
}
}
}

View File

@ -41,15 +41,18 @@ import static org.elasticsearch.client.transform.transforms.SourceConfigTests.ra
public class TransformConfigTests extends AbstractXContentTestCase<TransformConfig> {
public static TransformConfig randomTransformConfig() {
return new TransformConfig(randomAlphaOfLengthBetween(1, 10),
return new TransformConfig(
randomAlphaOfLengthBetween(1, 10),
randomSourceConfig(),
randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)),
randomBoolean() ? null : randomSyncConfig(),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString());
randomBoolean() ? null : Version.CURRENT.toString()
);
}
public static SyncConfig randomSyncConfig() {

View File

@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.List;
import static org.elasticsearch.client.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.client.transform.transforms.SettingsConfigTests.randomSettingsConfig;
import static org.elasticsearch.client.transform.transforms.SourceConfigTests.randomSourceConfig;
public class TransformConfigUpdateTests extends AbstractXContentTestCase<TransformConfigUpdate> {
@ -42,7 +43,9 @@ public class TransformConfigUpdateTests extends AbstractXContentTestCase<Transfo
randomBoolean() ? null : randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : randomSettingsConfig()
);
}
public static SyncConfig randomSyncConfig() {

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.client.transform.transforms.SettingsConfig;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
public class SettingsConfigTests extends AbstractResponseTestCase<
org.elasticsearch.xpack.core.transform.transforms.SettingsConfig,
SettingsConfig> {
public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig randomSettingsConfig() {
return new org.elasticsearch.xpack.core.transform.transforms.SettingsConfig(
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat()
);
}
public static void assertHlrcEquals(
org.elasticsearch.xpack.core.transform.transforms.SettingsConfig serverTestInstance,
SettingsConfig clientInstance
) {
assertEquals(serverTestInstance.getMaxPageSearchSize(), clientInstance.getMaxPageSearchSize());
assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond());
}
@Override
protected org.elasticsearch.xpack.core.transform.transforms.SettingsConfig createServerTestInstance(XContentType xContentType) {
return randomSettingsConfig();
}
@Override
protected SettingsConfig doParseToClientInstance(XContentParser parser) throws IOException {
return SettingsConfig.fromXContent(parser);
}
@Override
protected void assertInstances(
org.elasticsearch.xpack.core.transform.transforms.SettingsConfig serverTestInstance,
SettingsConfig clientInstance
) {
assertHlrcEquals(serverTestInstance, clientInstance);
}
}

View File

@ -84,9 +84,6 @@ include-tagged::{doc-tests-file}[{api}-pivot-config]
--------------------------------------------------
<1> The `GroupConfig` to use in the pivot
<2> The aggregations to use
<3> The maximum paging size for the {transform} when pulling data
from the source. The size dynamically adjusts as the {transform}
is running to recover from and prevent OOM issues.
===== GroupConfig
The grouping terms. Defines the group by and destination fields
@ -115,6 +112,18 @@ include-tagged::{doc-tests-file}[{api}-agg-config]
--------------------------------------------------
<1> Aggregate the average star rating
===== SettingsConfig
Defines settings.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-settings-config]
--------------------------------------------------
<1> The maximum paging size for the {transform} when pulling data
from the source. The size dynamically adjusts as the {transform}
is running to recover from and prevent OOM issues.
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]

View File

@ -149,6 +149,7 @@ The API returns the following results:
}
},
"description" : "Maximum priced ecommerce data by customer_id in Asia",
"settings" : { },
"version" : "7.5.0",
"create_time" : 1576094542936
}

View File

@ -236,6 +236,7 @@ When the {transform} is updated, you receive the updated configuration:
"delay": "120s"
}
},
"settings": { },
"version": "7.5.0",
"create_time": 1518808660505
}

View File

@ -183,6 +183,14 @@ public abstract class AbstractObjectParser<Value, Context> {
declareField(consumer, p -> p.floatValue(), field, ValueType.FLOAT);
}
/**
* Declare a float field that parses explicit {@code null}s in the json to a default value.
*/
public void declareFloatOrNull(BiConsumer<Value, Float> consumer, float nullValue, ParseField field) {
declareField(consumer, p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? nullValue : p.floatValue(),
field, ValueType.FLOAT_OR_NULL);
}
public void declareDouble(BiConsumer<Value, Double> consumer, ParseField field) {
// Using a method reference here angers some compilers
declareField(consumer, p -> p.doubleValue(), field, ValueType.DOUBLE);

View File

@ -30,9 +30,11 @@ public final class TransformField {
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME_BASED_SYNC = new ParseField("time");
@ -86,6 +88,5 @@ public final class TransformField {
// internal document id
public static String DOCUMENT_ID_FIELD = "_id";
private TransformField() {
}
private TransformField() {}
}

View File

@ -6,10 +6,11 @@
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -18,10 +19,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.compat.UpdateTransformActionPre78;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -38,11 +41,12 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
super(NAME, Response::new);
}
public static class Request extends AcknowledgedRequest<Request> {
public static class Request extends BaseTasksRequest<Request> {
private final TransformConfigUpdate update;
private final String id;
private final boolean deferValidation;
private TransformConfig config;
public Request(TransformConfigUpdate update, String id, boolean deferValidation) {
this.update = update;
@ -50,11 +54,23 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
this.deferValidation = deferValidation;
}
public Request(StreamInput in) throws IOException {
// use fromStreamWithBWC, this can be changed back to public after BWC is not required anymore
private Request(StreamInput in) throws IOException {
super(in);
this.update = new TransformConfigUpdate(in);
this.id = in.readString();
this.deferValidation = in.readBoolean();
if (in.readBoolean()) {
this.config = new TransformConfig(in);
}
}
public static Request fromStreamWithBWC(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
return new Request(in);
}
UpdateTransformActionPre78.Request r = new UpdateTransformActionPre78.Request(in);
return new Request(r.getUpdate(), r.getId(), r.isDeferValidation());
}
public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
@ -104,17 +120,37 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
return update;
}
public TransformConfig getConfig() {
return config;
}
public void setConfig(TransformConfig config) {
this.config = config;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
super.writeTo(out);
this.update.writeTo(out);
update.writeTo(out);
out.writeString(id);
out.writeBoolean(deferValidation);
if (config == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
config.writeTo(out);
}
return;
}
UpdateTransformActionPre78.Request r = new UpdateTransformActionPre78.Request(update, id, deferValidation);
r.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(update, id, deferValidation);
return Objects.hash(update, id, deferValidation, config);
}
@Override
@ -126,30 +162,56 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
return false;
}
Request other = (Request) obj;
return Objects.equals(update, other.update) && this.deferValidation == other.deferValidation && this.id.equals(other.id);
return Objects.equals(update, other.update)
&& this.deferValidation == other.deferValidation
&& this.id.equals(other.id)
&& Objects.equals(config, other.config);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends BaseTasksResponse implements ToXContentObject {
private final TransformConfig config;
public Response(TransformConfig config) {
// ignore failures
super(Collections.emptyList(), Collections.emptyList());
this.config = config;
}
public Response(StreamInput in) throws IOException {
// use fromStreamWithBWC, this can be changed back to public after BWC is not required anymore
private Response(StreamInput in) throws IOException {
super(in);
this.config = new TransformConfig(in);
}
public static Response fromStreamWithBWC(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
return new Response(in);
}
UpdateTransformActionPre78.Response r = new UpdateTransformActionPre78.Response(in);
return new Response(r.getConfig());
}
public TransformConfig getConfig() {
return config;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
this.config.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
super.writeTo(out);
config.writeTo(out);
return;
}
UpdateTransformActionPre78.Response r = new UpdateTransformActionPre78.Response(config);
r.writeTo(out);
}
@Override
public int hashCode() {
return config.hashCode();
return Objects.hash(super.hashCode(), config);
}
@Override
@ -161,12 +223,14 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
return false;
}
Response other = (Response) obj;
return Objects.equals(config, other.config);
return Objects.equals(config, other.config) && super.equals(obj);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContentCommon(builder, params);
return config.toXContent(builder, params);
}
}
}

View File

@ -16,7 +16,7 @@ public class UpdateTransformActionDeprecated extends ActionType<UpdateTransformA
public static final String NAME = "cluster:admin/data_frame/update";
private UpdateTransformActionDeprecated() {
super(NAME, Response::new);
super(NAME, Response::fromStreamWithBWC);
}
}

View File

@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.transform.action.compat;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import java.io.IOException;
import java.util.Objects;
/**
* In 7.8 update transform has been changed from ordinary request/response objects to tasks request/response.
* These classes are helpers to translate the old serialization format.
*/
public class UpdateTransformActionPre78 {
public static class Request extends AcknowledgedRequest<Request> {
private final TransformConfigUpdate update;
private final String id;
private final boolean deferValidation;
public Request(TransformConfigUpdate update, String id, boolean deferValidation) {
this.update = update;
this.id = id;
this.deferValidation = deferValidation;
}
public Request(StreamInput in) throws IOException {
super(in);
assert in.getVersion().before(Version.V_7_8_0);
this.update = new TransformConfigUpdate(in);
this.id = in.readString();
this.deferValidation = in.readBoolean();
}
public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
return new Request(TransformConfigUpdate.fromXContent(parser), id, deferValidation);
}
public String getId() {
return id;
}
public boolean isDeferValidation() {
return deferValidation;
}
public TransformConfigUpdate getUpdate() {
return update;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
assert out.getVersion().before(Version.V_7_8_0);
super.writeTo(out);
this.update.writeTo(out);
out.writeString(id);
out.writeBoolean(deferValidation);
}
@Override
public int hashCode() {
return Objects.hash(update, id, deferValidation);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(update, other.update) && this.deferValidation == other.deferValidation && this.id.equals(other.id);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final TransformConfig config;
public Response(TransformConfig config) {
this.config = config;
}
public Response(StreamInput in) throws IOException {
assert in.getVersion().before(Version.V_7_8_0);
this.config = new TransformConfig(in);
}
public TransformConfig getConfig() {
return config;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
assert out.getVersion().before(Version.V_7_8_0);
this.config.writeTo(out);
}
@Override
public int hashCode() {
return config.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(config, other.config);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return config.toXContent(builder, params);
}
}
}

View File

@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.TransformField;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class SettingsConfig implements Writeable, ToXContentObject {
public static final ConstructingObjectParser<SettingsConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<SettingsConfig, Void> LENIENT_PARSER = createParser(true);
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
"transform_config_settings",
lenient,
args -> new SettingsConfig((Integer) args[0], (Float) args[1])
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND);
return parser;
}
private final Integer maxPageSearchSize;
private final Float docsPerSecond;
public SettingsConfig() {
this(null, null);
}
public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
}
public SettingsConfig(final StreamInput in) throws IOException {
this.maxPageSearchSize = in.readOptionalInt();
this.docsPerSecond = in.readOptionalFloat();
}
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}
public Float getDocsPerSecond() {
return docsPerSecond;
}
public boolean isValid() {
return true;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(maxPageSearchSize);
out.writeOptionalFloat(docsPerSecond);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// do not write default values
if (maxPageSearchSize != null && (maxPageSearchSize.equals(DEFAULT_MAX_PAGE_SEARCH_SIZE) == false)) {
builder.field(TransformField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
if (docsPerSecond != null && (docsPerSecond.equals(DEFAULT_DOCS_PER_SECOND) == false)) {
builder.field(TransformField.DOCS_PER_SECOND.getPreferredName(), docsPerSecond);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other == null || other.getClass() != getClass()) {
return false;
}
SettingsConfig that = (SettingsConfig) other;
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond);
}
@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond);
}
public static SettingsConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
}
public static class Builder {
private Integer maxPageSearchSize;
private Float docsPerSecond;
/**
* Default builder
*/
public Builder() {
}
/**
* Builder starting from existing settings as base, for the purpose of partially updating settings.
*
* @param base base settings
*/
public Builder(SettingsConfig base) {
this.maxPageSearchSize = base.maxPageSearchSize;
this.docsPerSecond = base.docsPerSecond;
}
/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
* pulling the data from the source index.
*
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
*
* @param maxPageSearchSize Integer value between 10 and 10_000
* @return the {@link Builder} with the paging maxPageSearchSize set.
*/
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
this.maxPageSearchSize = maxPageSearchSize == null ? DEFAULT_MAX_PAGE_SEARCH_SIZE : maxPageSearchSize;
return this;
}
/**
* Sets the docs per second that transform can use when pulling the data from the source index.
*
* This setting throttles transform by issuing queries less often, however processing still happens in
* batches. A value of 0 disables throttling (default).
*
* @param docsPerSecond Integer value
* @return the {@link Builder} with requestsPerSecond set.
*/
public Builder setRequestsPerSecond(Float docsPerSecond) {
this.docsPerSecond = docsPerSecond == null ? DEFAULT_DOCS_PER_SECOND : docsPerSecond;
return this;
}
/**
* Update settings according to given settings config.
*
* @param update update settings
* @return the {@link Builder} with applied updates.
*/
public Builder update(SettingsConfig update) {
// if explicit {@code null}s have been set in the update, we do not want to carry the default, but get rid
// of the setting
if (update.getDocsPerSecond() != null) {
this.docsPerSecond = update.getDocsPerSecond().equals(DEFAULT_DOCS_PER_SECOND) ? null : update.getDocsPerSecond();
}
if (update.getMaxPageSearchSize() != null) {
this.maxPageSearchSize = update.getMaxPageSearchSize().equals(DEFAULT_MAX_PAGE_SEARCH_SIZE)
? null
: update.getMaxPageSearchSize();
}
return this;
}
public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond);
}
}
}

View File

@ -56,6 +56,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
private final DestConfig dest;
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final SettingsConfig settings;
private final String description;
// headers store the user context from the creating user, which allows us to run the transform as this user
// the header only contains name, groups and other context but no authorization keys
@ -72,8 +73,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
}
private static ConstructingObjectParser<TransformConfig, String> createParser(boolean lenient) {
ConstructingObjectParser<TransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
(args, optionalId) -> {
ConstructingObjectParser<TransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient, (args, optionalId) -> {
String id = (String) args[0];
// if the id has been specified in the body and the path, they must match
@ -81,14 +81,16 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
id = optionalId;
} else if (optionalId != null && id.equals(optionalId) == false) {
throw new IllegalArgumentException(
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_INCONSISTENT_ID, id, optionalId));
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_INCONSISTENT_ID, id, optionalId)
);
}
SourceConfig source = (SourceConfig) args[1];
DestConfig dest = (DestConfig) args[2];
TimeValue frequency =
args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], TransformField.FREQUENCY.getPreferredName());
TimeValue frequency = args[3] == null
? null
: TimeValue.parseTimeValue((String) args[3], TransformField.FREQUENCY.getPreferredName());
SyncConfig syncConfig = (SyncConfig) args[4];
// ignored, only for internal storage: String docType = (String) args[5];
@ -96,8 +98,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
// on strict parsing do not allow injection of headers, transform version, or create time
if (lenient == false) {
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
validateStrictParsingParams(args[9], TransformField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], TransformField.VERSION.getPreferredName());
validateStrictParsingParams(args[10], TransformField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[11], TransformField.VERSION.getPreferredName());
}
@SuppressWarnings("unchecked")
@ -105,7 +107,9 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
PivotConfig pivotConfig = (PivotConfig) args[7];
String description = (String) args[8];
return new TransformConfig(id,
SettingsConfig settings = (SettingsConfig) args[9];
return new TransformConfig(
id,
source,
dest,
frequency,
@ -113,25 +117,28 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
headers,
pivotConfig,
description,
(Instant)args[9],
(String)args[10]);
settings,
(Instant) args[10],
(String) args[11]
);
});
parser.declareString(optionalConstructorArg(), TransformField.ID);
parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), TransformField.SOURCE);
parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), TransformField.DESTINATION);
parser.declareString(optionalConstructorArg(), TransformField.FREQUENCY);
parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), TransformField.SYNC);
parser.declareString(optionalConstructorArg(), TransformField.INDEX_DOC_TYPE);
parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
parser.declareString(optionalConstructorArg(), TransformField.DESCRIPTION);
parser.declareField(optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, TransformField.CREATE_TIME.getPreferredName()), TransformField.CREATE_TIME,
ObjectParser.ValueType.VALUE);
parser.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p, lenient), TransformField.SETTINGS);
parser.declareField(
optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, TransformField.CREATE_TIME.getPreferredName()),
TransformField.CREATE_TIME,
ObjectParser.ValueType.VALUE
);
parser.declareString(optionalConstructorArg(), TransformField.VERSION);
return parser;
}
@ -148,7 +155,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return NAME + "-" + transformId;
}
TransformConfig(final String id,
TransformConfig(
final String id,
final SourceConfig source,
final DestConfig dest,
final TimeValue frequency,
@ -156,8 +164,10 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
final Map<String, String> headers,
final PivotConfig pivotConfig,
final String description,
final SettingsConfig settings,
final Instant createTime,
final String version){
final String version
) {
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.source = ExceptionsHelper.requireNonNull(source, TransformField.SOURCE.getPreferredName());
this.dest = ExceptionsHelper.requireNonNull(dest, TransformField.DESTINATION.getPreferredName());
@ -166,6 +176,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
this.setHeaders(headers == null ? Collections.emptyMap() : headers);
this.pivotConfig = pivotConfig;
this.description = description;
this.settings = settings == null ? new SettingsConfig() : settings;
// at least one function must be defined
if (this.pivotConfig == null) {
@ -178,15 +189,18 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
this.transformVersion = version == null ? null : Version.fromString(version);
}
public TransformConfig(final String id,
public TransformConfig(
final String id,
final SourceConfig source,
final DestConfig dest,
final TimeValue frequency,
final SyncConfig syncConfig,
final Map<String, String> headers,
final PivotConfig pivotConfig,
final String description) {
this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null);
final String description,
final SettingsConfig settings
) {
this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, settings, null, null);
}
public TransformConfig(final StreamInput in) throws IOException {
@ -210,6 +224,11 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
createTime = null;
transformVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
settings = new SettingsConfig(in);
} else {
settings = new SettingsConfig();
}
}
public String getId() {
@ -269,6 +288,10 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return description;
}
public SettingsConfig getSettings() {
return settings;
}
public boolean isValid() {
if (pivotConfig != null && pivotConfig.isValid() == false) {
return false;
@ -278,7 +301,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return false;
}
return source.isValid() && dest.isValid();
return settings.isValid() && source.isValid() && dest.isValid();
}
@Override
@ -302,6 +325,9 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
settings.writeTo(out);
}
}
@Override
@ -330,12 +356,16 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
if (description != null) {
builder.field(TransformField.DESCRIPTION.getPreferredName(), description);
}
builder.field(TransformField.SETTINGS.getPreferredName(), settings);
if (transformVersion != null) {
builder.field(TransformField.VERSION.getPreferredName(), transformVersion);
}
if (createTime != null) {
builder.timeField(TransformField.CREATE_TIME.getPreferredName(), TransformField.CREATE_TIME.getPreferredName() + "_string",
createTime.toEpochMilli());
builder.timeField(
TransformField.CREATE_TIME.getPreferredName(),
TransformField.CREATE_TIME.getPreferredName() + "_string",
createTime.toEpochMilli()
);
}
builder.endObject();
return builder;
@ -361,13 +391,26 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.transformVersion, that.transformVersion);
}
@Override
public int hashCode() {
return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
return Objects.hash(
id,
source,
dest,
frequency,
syncConfig,
headers,
pivotConfig,
description,
settings,
createTime,
transformVersion
);
}
@Override
@ -375,8 +418,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return Strings.toString(this, true, true);
}
public static TransformConfig fromXContent(final XContentParser parser, @Nullable final String optionalTransformId,
boolean lenient) {
public static TransformConfig fromXContent(final XContentParser parser, @Nullable final String optionalTransformId, boolean lenient) {
return lenient ? LENIENT_PARSER.apply(parser, optionalTransformId) : STRICT_PARSER.apply(parser, optionalTransformId);
}
@ -392,6 +434,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
private Version transformVersion;
private Instant createTime;
private PivotConfig pivotConfig;
private SettingsConfig settings;
public Builder() {}
@ -405,6 +448,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
this.transformVersion = config.transformVersion;
this.createTime = config.createTime;
this.pivotConfig = config.pivotConfig;
this.settings = config.settings;
}
public Builder setId(String id) {
@ -437,6 +481,11 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return this;
}
public Builder setSettings(SettingsConfig settings) {
this.settings = settings;
return this;
}
public Builder setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
@ -453,7 +502,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
}
public TransformConfig build() {
return new TransformConfig(id,
return new TransformConfig(
id,
source,
dest,
frequency,
@ -461,8 +511,10 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
headers,
pivotConfig,
description,
settings,
createTime,
transformVersion == null ? null : transformVersion.toString());
transformVersion == null ? null : transformVersion.toString()
);
}
@Override
@ -485,13 +537,26 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.transformVersion, that.transformVersion);
}
@Override
public int hashCode() {
return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
return Objects.hash(
id,
source,
dest,
frequency,
syncConfig,
headers,
pivotConfig,
description,
settings,
createTime,
transformVersion
);
}
}
}

View File

@ -45,7 +45,8 @@ public class TransformConfigUpdate implements Writeable {
: TimeValue.parseTimeValue((String) args[2], TransformField.FREQUENCY.getPreferredName());
SyncConfig syncConfig = (SyncConfig) args[3];
String description = (String) args[4];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description);
SettingsConfig settings = (SettingsConfig) args[5];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings);
}
);
@ -55,6 +56,7 @@ public class TransformConfigUpdate implements Writeable {
PARSER.declareString(optionalConstructorArg(), TransformField.FREQUENCY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), TransformField.SYNC);
PARSER.declareString(optionalConstructorArg(), TransformField.DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p, false), TransformField.SETTINGS);
}
private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException {
@ -70,6 +72,7 @@ public class TransformConfigUpdate implements Writeable {
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final String description;
private final SettingsConfig settings;
private Map<String, String> headers;
public TransformConfigUpdate(
@ -77,7 +80,8 @@ public class TransformConfigUpdate implements Writeable {
final DestConfig dest,
final TimeValue frequency,
final SyncConfig syncConfig,
final String description
final String description,
final SettingsConfig settings
) {
this.source = source;
this.dest = dest;
@ -87,6 +91,7 @@ public class TransformConfigUpdate implements Writeable {
if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) {
throw new IllegalArgumentException("[description] must be less than 1000 characters in length.");
}
this.settings = settings;
}
public TransformConfigUpdate(final StreamInput in) throws IOException {
@ -98,6 +103,12 @@ public class TransformConfigUpdate implements Writeable {
if (in.readBoolean()) {
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
}
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
settings = in.readOptionalWriteable(SettingsConfig::new);
} else {
settings = null;
}
}
public SourceConfig getSource() {
@ -121,6 +132,11 @@ public class TransformConfigUpdate implements Writeable {
return description;
}
@Nullable
public SettingsConfig getSettings() {
return settings;
}
public Map<String, String> getHeaders() {
return headers;
}
@ -142,6 +158,9 @@ public class TransformConfigUpdate implements Writeable {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeOptionalWriteable(settings);
}
}
@Override
@ -161,12 +180,13 @@ public class TransformConfigUpdate implements Writeable {
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.headers, that.headers);
}
@Override
public int hashCode() {
return Objects.hash(source, dest, frequency, syncConfig, description, headers);
return Objects.hash(source, dest, frequency, syncConfig, description, settings, headers);
}
public static TransformConfigUpdate fromXContent(final XContentParser parser) {
@ -179,9 +199,14 @@ public class TransformConfigUpdate implements Writeable {
&& isNullOrEqual(frequency, config.getFrequency())
&& isNullOrEqual(syncConfig, config.getSyncConfig())
&& isNullOrEqual(description, config.getDescription())
&& isNullOrEqual(settings, config.getSettings())
&& isNullOrEqual(headers, config.getHeaders());
}
public boolean changesSettings(TransformConfig config) {
return isNullOrEqual(settings, config.getSettings()) == false;
}
private boolean isNullOrEqual(Object lft, Object rgt) {
return lft == null || lft.equals(rgt);
}
@ -221,6 +246,12 @@ public class TransformConfigUpdate implements Writeable {
if (headers != null) {
builder.setHeaders(headers);
}
if (settings != null) {
// settings are partially updateable, that means we only overwrite changed settings but keep others
SettingsConfig.Builder settingsBuilder = new SettingsConfig.Builder(config.getSettings());
settingsBuilder.update(settings);
builder.setSettings(settingsBuilder.build());
}
builder.setVersion(Version.CURRENT);
return builder.build();
}

View File

@ -6,10 +6,12 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -34,6 +36,8 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class PivotConfig implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_pivot";
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(PivotConfig.class));
private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;
@ -78,6 +82,13 @@ public class PivotConfig implements Writeable, ToXContentObject {
this.groups = ExceptionsHelper.requireNonNull(groups, TransformField.GROUP_BY.getPreferredName());
this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, TransformField.AGGREGATIONS.getPreferredName());
this.maxPageSearchSize = maxPageSearchSize;
if (maxPageSearchSize != null) {
deprecationLogger.deprecatedAndMaybeLog(
TransformField.MAX_PAGE_SEARCH_SIZE.getPreferredName(),
"[max_page_search_size] is deprecated inside pivot please use settings instead"
);
}
}
public PivotConfig(StreamInput in) throws IOException {

View File

@ -6,7 +6,11 @@
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -19,12 +23,12 @@ import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.emptyList;
public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable>
extends AbstractSerializingTestCase<T> {
public abstract class AbstractSerializingTransformTestCase<T extends ToXContent & Writeable> extends AbstractSerializingTestCase<T> {
private NamedWriteableRegistry namedWriteableRegistry;
private NamedXContentRegistry namedXContentRegistry;
@ -34,8 +38,9 @@ public abstract class AbstractSerializingTransformTestCase<T extends ToXContent
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(),
TimeSyncConfig::new));
namedWriteables.add(
new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(), TimeSyncConfig::new)
);
List<NamedXContentRegistry.Entry> namedXContents = searchModule.getNamedXContents();
namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers());
@ -54,4 +59,22 @@ public abstract class AbstractSerializingTransformTestCase<T extends ToXContent
return namedXContentRegistry;
}
protected <X extends Writeable, Y extends Writeable> Y writeAndReadBWCObject(
X original,
NamedWriteableRegistry namedWriteableRegistry,
Writeable.Writer<X> writer,
Writeable.Reader<Y> reader,
Version version
) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(version);
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) {
in.setVersion(version);
return reader.read(in);
}
}
}
}

View File

@ -6,7 +6,11 @@
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -18,6 +22,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.emptyList;
@ -34,8 +39,9 @@ public abstract class AbstractWireSerializingTransformTestCase<T extends Writeab
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(),
TimeSyncConfig::new));
namedWriteables.add(
new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(), TimeSyncConfig::new)
);
List<NamedXContentRegistry.Entry> namedXContents = searchModule.getNamedXContents();
namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers());
@ -53,4 +59,22 @@ public abstract class AbstractWireSerializingTransformTestCase<T extends Writeab
protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}
protected <X extends Writeable, Y extends Writeable> Y writeAndReadBWCObject(
X original,
NamedWriteableRegistry namedWriteableRegistry,
Writeable.Writer<X> writer,
Writeable.Reader<Y> reader,
Version version
) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(version);
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) {
in.setVersion(version);
return reader.read(in);
}
}
}
}

View File

@ -12,9 +12,9 @@ import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import java.io.IOException;
@ -48,25 +48,33 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans
randomBoolean() ? TransformConfigTests.randomSyncConfig() : null,
null,
PivotConfigTests.randomPivotConfig(),
null);
null,
null
);
return new Request(config);
}
public void testParsingOverwritesIdAndDestFields() throws IOException {
// id & dest fields will be set by the parser
BytesArray json = new BytesArray(
"{ " +
"\"source\":{" +
" \"index\":\"foo\", " +
" \"query\": {\"match_all\": {}}}," +
"\"pivot\": {" +
"\"group_by\": {\"destination-field2\": {\"terms\": {\"field\": \"term-field\"}}}," +
"\"aggs\": {\"avg_response\": {\"avg\": {\"field\": \"responsetime\"}}}" +
"}" +
"}");
"{ "
+ "\"source\":{"
+ " \"index\":\"foo\", "
+ " \"query\": {\"match_all\": {}}},"
+ "\"pivot\": {"
+ "\"group_by\": {\"destination-field2\": {\"terms\": {\"field\": \"term-field\"}}},"
+ "\"aggs\": {\"avg_response\": {\"avg\": {\"field\": \"responsetime\"}}}"
+ "}"
+ "}"
);
try (XContentParser parser = JsonXContent.jsonXContent
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput())) {
try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
json.streamInput()
)
) {
Request request = Request.fromXContent(parser);
assertEquals("transform-preview", request.getConfig().getId());

View File

@ -6,8 +6,13 @@
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.compat.UpdateTransformActionPre78;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import java.io.IOException;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdateTests.randomTransformConfigUpdate;
@ -15,12 +20,50 @@ public class UpdateTransformActionRequestTests extends AbstractWireSerializingTr
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
return Request::fromStreamWithBWC;
}
@Override
protected Request createTestInstance() {
return new Request(randomTransformConfigUpdate(), randomAlphaOfLength(10), randomBoolean());
Request request = new Request(randomTransformConfigUpdate(), randomAlphaOfLength(10), randomBoolean());
if (randomBoolean()) {
request.setConfig(TransformConfigTests.randomTransformConfig());
}
return request;
}
public void testBWCPre78() throws IOException {
Request newRequest = createTestInstance();
UpdateTransformActionPre78.Request oldRequest = writeAndReadBWCObject(
newRequest,
getNamedWriteableRegistry(),
(out, value) -> value.writeTo(out),
UpdateTransformActionPre78.Request::new,
Version.V_7_7_0
);
assertEquals(newRequest.getId(), oldRequest.getId());
assertEquals(newRequest.getUpdate().getDestination(), oldRequest.getUpdate().getDestination());
assertEquals(newRequest.getUpdate().getFrequency(), oldRequest.getUpdate().getFrequency());
assertEquals(newRequest.getUpdate().getSource(), oldRequest.getUpdate().getSource());
assertEquals(newRequest.getUpdate().getSyncConfig(), oldRequest.getUpdate().getSyncConfig());
assertEquals(newRequest.isDeferValidation(), oldRequest.isDeferValidation());
Request newRequestFromOld = writeAndReadBWCObject(
oldRequest,
getNamedWriteableRegistry(),
(out, value) -> value.writeTo(out),
Request::fromStreamWithBWC,
Version.V_7_7_0
);
assertEquals(newRequest.getId(), newRequestFromOld.getId());
assertEquals(newRequest.getUpdate().getDestination(), newRequestFromOld.getUpdate().getDestination());
assertEquals(newRequest.getUpdate().getFrequency(), newRequestFromOld.getUpdate().getFrequency());
assertEquals(newRequest.getUpdate().getSource(), newRequestFromOld.getUpdate().getSource());
assertEquals(newRequest.getUpdate().getSyncConfig(), newRequestFromOld.getUpdate().getSyncConfig());
assertEquals(newRequest.isDeferValidation(), newRequestFromOld.isDeferValidation());
}
}

View File

@ -6,9 +6,11 @@
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response;
import org.elasticsearch.xpack.core.transform.action.compat.UpdateTransformActionPre78;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
@ -23,11 +25,50 @@ public class UpdateTransformsActionResponseTests extends AbstractSerializingTran
@Override
protected Reader<Response> instanceReader() {
return Response::new;
return Response::fromStreamWithBWC;
}
@Override
protected Response doParseInstance(XContentParser parser) throws IOException {
return new Response(TransformConfig.fromXContent(parser, null, false));
}
public void testBWCPre78() throws IOException {
Response newResponse = createTestInstance();
UpdateTransformActionPre78.Response oldResponse = writeAndReadBWCObject(
newResponse,
getNamedWriteableRegistry(),
(out, value) -> value.writeTo(out),
UpdateTransformActionPre78.Response::new,
Version.V_7_7_0
);
assertEquals(newResponse.getConfig().getDescription(), oldResponse.getConfig().getDescription());
assertEquals(newResponse.getConfig().getId(), oldResponse.getConfig().getId());
assertEquals(newResponse.getConfig().getCreateTime(), oldResponse.getConfig().getCreateTime());
assertEquals(newResponse.getConfig().getDestination(), oldResponse.getConfig().getDestination());
assertEquals(newResponse.getConfig().getFrequency(), oldResponse.getConfig().getFrequency());
assertEquals(newResponse.getConfig().getPivotConfig(), oldResponse.getConfig().getPivotConfig());
assertEquals(newResponse.getConfig().getSource(), oldResponse.getConfig().getSource());
assertEquals(newResponse.getConfig().getSyncConfig(), oldResponse.getConfig().getSyncConfig());
assertEquals(newResponse.getConfig().getVersion(), oldResponse.getConfig().getVersion());
//
Response newRequestFromOld = writeAndReadBWCObject(
oldResponse,
getNamedWriteableRegistry(),
(out, value) -> value.writeTo(out),
Response::fromStreamWithBWC,
Version.V_7_7_0
);
assertEquals(newResponse.getConfig().getDescription(), newRequestFromOld.getConfig().getDescription());
assertEquals(newResponse.getConfig().getId(), newRequestFromOld.getConfig().getId());
assertEquals(newResponse.getConfig().getCreateTime(), newRequestFromOld.getConfig().getCreateTime());
assertEquals(newResponse.getConfig().getDestination(), newRequestFromOld.getConfig().getDestination());
assertEquals(newResponse.getConfig().getFrequency(), newRequestFromOld.getConfig().getFrequency());
assertEquals(newResponse.getConfig().getPivotConfig(), newRequestFromOld.getConfig().getPivotConfig());
assertEquals(newResponse.getConfig().getSource(), newRequestFromOld.getConfig().getSource());
assertEquals(newResponse.getConfig().getSyncConfig(), newRequestFromOld.getConfig().getSyncConfig());
assertEquals(newResponse.getConfig().getVersion(), newRequestFromOld.getConfig().getVersion());
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;
import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SettingsConfigTests extends AbstractSerializingTransformTestCase<SettingsConfig> {
private boolean lenient;
public static SettingsConfig randomSettingsConfig() {
return new SettingsConfig(randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat());
}
public static SettingsConfig randomNonEmptySettingsConfig() {
return new SettingsConfig(randomIntBetween(10, 10_000), randomFloat());
}
@Before
public void setRandomFeatures() {
lenient = randomBoolean();
}
@Override
protected SettingsConfig doParseInstance(XContentParser parser) throws IOException {
return SettingsConfig.fromXContent(parser, lenient);
}
@Override
protected boolean supportsUnknownFields() {
return lenient;
}
@Override
protected SettingsConfig createTestInstance() {
return randomSettingsConfig();
}
@Override
protected Reader<SettingsConfig> instanceReader() {
return SettingsConfig::new;
}
public void testExplicitNullParsing() throws IOException {
// explicit null
assertThat(fromString("{\"max_page_search_size\" : null}").getMaxPageSearchSize(), equalTo(-1));
// not set
assertNull(fromString("{}").getMaxPageSearchSize());
assertThat(fromString("{\"docs_per_second\" : null}").getDocsPerSecond(), equalTo(-1F));
assertNull(fromString("{}").getDocsPerSecond());
}
public void testUpdateUsingBuilder() throws IOException {
SettingsConfig config = fromString("{\"max_page_search_size\" : 10000, \"docs_per_second\" :42}");
SettingsConfig.Builder builder = new SettingsConfig.Builder(config);
builder.update(fromString("{\"max_page_search_size\" : 100}"));
assertThat(builder.build().getMaxPageSearchSize(), equalTo(100));
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
builder.update(fromString("{\"max_page_search_size\" : null}"));
assertNull(builder.build().getMaxPageSearchSize());
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
builder.update(fromString("{\"max_page_search_size\" : 77, \"docs_per_second\" :null}"));
assertThat(builder.build().getMaxPageSearchSize(), equalTo(77));
assertNull(builder.build().getDocsPerSecond());
}
public void testOmmitDefaultsOnWriteParser() throws IOException {
// test that an explicit null is handled differently than not set
SettingsConfig config = fromString("{\"max_page_search_size\" : null}");
assertThat(config.getMaxPageSearchSize(), equalTo(-1));
Map<String, Object> settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
SettingsConfig emptyConfig = fromString("{}");
assertNull(emptyConfig.getMaxPageSearchSize());
settingsAsMap = xContentToMap(emptyConfig);
assertTrue(settingsAsMap.isEmpty());
config = fromString("{\"docs_per_second\" : null}");
assertThat(config.getDocsPerSecond(), equalTo(-1F));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
public void testOmmitDefaultsOnWriteBuilder() throws IOException {
// test that an explicit null is handled differently than not set
SettingsConfig config = new SettingsConfig.Builder().setMaxPageSearchSize(null).build();
assertThat(config.getMaxPageSearchSize(), equalTo(-1));
Map<String, Object> settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
settingsAsMap = xContentToMap(emptyConfig);
assertTrue(settingsAsMap.isEmpty());
config = new SettingsConfig.Builder().setRequestsPerSecond(null).build();
assertThat(config.getDocsPerSecond(), equalTo(-1F));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
xcontent.toXContent(builder, XContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
return parser.map();
}
private SettingsConfig fromString(String source) throws IOException {
try (XContentParser parser = createParser(JsonXContent.jsonXContent, source)) {
return SettingsConfig.fromXContent(parser, false);
}
}
}

View File

@ -53,6 +53,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomSettingsConfig(),
null,
null
);
@ -68,6 +69,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
randomHeaders(),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
);
@ -83,7 +85,8 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
randomBoolean() ? randomSyncConfig() : null,
randomHeaders(),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null
);
} // else
return new TransformConfig(
@ -94,7 +97,8 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
randomBoolean() ? randomSyncConfig() : null,
randomHeaders(),
PivotConfigTests.randomInvalidPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null
);
}
@ -258,7 +262,8 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
null,
PivotConfigTests.randomPivotConfig(),
randomAlphaOfLength(1001)
randomAlphaOfLength(1001),
null
)
);
assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length."));
@ -271,7 +276,8 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
null,
PivotConfigTests.randomPivotConfig(),
description
description,
null
);
assertThat(description, equalTo(config.getDescription()));
}

View File

@ -36,7 +36,8 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
randomBoolean() ? null : randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig()
);
}
@ -57,14 +58,15 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
public void testIsNoop() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformConfig config = randomTransformConfig();
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null);
assertTrue("null update is not noop", update.isNoop(config));
update = new TransformConfigUpdate(
config.getSource(),
config.getDestination(),
config.getFrequency(),
config.getSyncConfig(),
config.getDescription()
config.getDescription(),
config.getSettings()
);
assertTrue("equal update is not noop", update.isNoop(config));
@ -73,7 +75,8 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
config.getDestination(),
config.getFrequency(),
config.getSyncConfig(),
"this is a new description"
"this is a new description",
config.getSettings()
);
assertFalse("true update is noop", update.isNoop(config));
}
@ -89,10 +92,11 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
Collections.singletonMap("key", "value"),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null);
assertThat(config, equalTo(update.apply(config)));
SourceConfig sourceConfig = new SourceConfig("the_new_index");
@ -100,7 +104,8 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
TimeValue frequency = TimeValue.timeValueSeconds(10);
SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30));
String newDescription = "new description";
update = new TransformConfigUpdate(sourceConfig, destConfig, frequency, syncConfig, newDescription);
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F);
update = new TransformConfigUpdate(sourceConfig, destConfig, frequency, syncConfig, newDescription, settings);
Map<String, String> headers = Collections.singletonMap("foo", "bar");
update.setHeaders(headers);
@ -111,10 +116,51 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getFrequency(), equalTo(frequency));
assertThat(updatedConfig.getSyncConfig(), equalTo(syncConfig));
assertThat(updatedConfig.getDescription(), equalTo(newDescription));
assertThat(updatedConfig.getSettings(), equalTo(settings));
assertThat(updatedConfig.getHeaders(), equalTo(headers));
assertThat(updatedConfig.getVersion(), equalTo(Version.CURRENT));
}
public void testApplySettings() {
TransformConfig config = new TransformConfig(
"time-transform",
randomSourceConfig(),
randomDestConfig(),
TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
TimeSyncConfigTests.randomTimeSyncConfig(),
Collections.singletonMap("key", "value"),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(4_000, null));
TransformConfig updatedConfig = update.apply(config);
// for settings we allow partial updates, so changing 1 setting should not overwrite the other
// the parser handles explicit nulls, tested in @link{SettingsConfigTests}
assertThat(updatedConfig.getSettings().getMaxPageSearchSize(), equalTo(4_000));
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(config.getSettings().getDocsPerSecond()));
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(null, 43.244F));
updatedConfig = update.apply(updatedConfig);
assertThat(updatedConfig.getSettings().getMaxPageSearchSize(), equalTo(4_000));
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F));
// now reset to default using the magic -1
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null));
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F));
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F));
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertNull(updatedConfig.getSettings().getDocsPerSecond());
}
public void testApplyWithSyncChange() {
TransformConfig batchConfig = new TransformConfig(
"batch-transform",
@ -125,11 +171,12 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, TimeSyncConfigTests.randomTimeSyncConfig(), null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, TimeSyncConfigTests.randomTimeSyncConfig(), null, null);
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, () -> update.apply(batchConfig));
assertThat(
@ -146,11 +193,12 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null, null, null, new FooSync(), null);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null, null, null, new FooSync(), null, null);
ex = expectThrows(ElasticsearchStatusException.class, () -> fooSyncUpdate.apply(timeSyncedConfig));
assertThat(
ex.getMessage(),
@ -187,6 +235,9 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
if (update.getDescription() != null) {
builder.field(TransformField.DESCRIPTION.getPreferredName(), update.getDescription());
}
if (update.getSettings() != null) {
builder.field(TransformField.SETTINGS.getPreferredName(), update.getSettings());
}
builder.endObject();
}

View File

@ -23,11 +23,19 @@ import static org.hamcrest.Matchers.empty;
public class PivotConfigTests extends AbstractSerializingTransformTestCase<PivotConfig> {
public static PivotConfig randomPivotConfigWithDeprecatedFields() {
return new PivotConfig(
GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomIntBetween(10, 10_000) // deprecated
);
}
public static PivotConfig randomPivotConfig() {
return new PivotConfig(
GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000)
null // deprecated
);
}
@ -35,7 +43,7 @@ public class PivotConfigTests extends AbstractSerializingTransformTestCase<Pivot
return new PivotConfig(
GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomInvalidAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000)
null // deprecated
);
}
@ -219,6 +227,11 @@ public class PivotConfigTests extends AbstractSerializingTransformTestCase<Pivot
);
}
public void testDeprecation() {
PivotConfig pivotConfig = randomPivotConfigWithDeprecatedFields();
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
}
private static String dotJoin(String... fields) {
return Strings.arrayToDelimitedString(fields, ".");
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.SettingsConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.TransformConfigUpdate;
@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.oneOf;
public class TransformIT extends TransformIntegTestCase {
@ -100,6 +102,7 @@ public class TransformIT extends TransformIntegTestCase {
aggs,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
null,
indexName
).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build();
@ -145,7 +148,7 @@ public class TransformIT extends TransformIntegTestCase {
String id = "transform-to-update";
String dest = "reviews-by-user-business-day-to-update";
TransformConfig config = createTransformConfigBuilder(id, groups, aggs, dest, QueryBuilders.matchAllQuery(), indexName)
TransformConfig config = createTransformConfigBuilder(id, groups, aggs, dest, QueryBuilders.matchAllQuery(), null, indexName)
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.build();
@ -171,7 +174,7 @@ public class TransformIT extends TransformIntegTestCase {
.setDest(DestConfig.builder().setIndex(dest).setPipeline(pipelineId).build())
.build();
RestHighLevelClient hlrc = new TestRestHighLevelClient();
try (RestHighLevelClient hlrc = new TestRestHighLevelClient()) {
final XContentBuilder pipelineBuilder = jsonBuilder().startObject()
.startArray("processors")
.startObject()
@ -210,7 +213,7 @@ public class TransformIT extends TransformIntegTestCase {
assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
hlrc.indices().refresh(new RefreshRequest(dest), RequestOptions.DEFAULT);
}, 30, TimeUnit.SECONDS);
}
stopTransform(config.getId());
deleteTransform(config.getId());
}
@ -235,6 +238,7 @@ public class TransformIT extends TransformIntegTestCase {
aggs,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
null,
indexName
).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build();
@ -258,6 +262,80 @@ public class TransformIT extends TransformIntegTestCase {
deleteTransform(config.getId());
}
public void testContinuousTransformRethrottle() throws Exception {
String indexName = "continuous-crud-reviews-throttled";
createReviewsIndex(indexName, 1000);
Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
TransformConfig config = createTransformConfigBuilder(
"transform-crud",
groups,
aggs,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
// set requests per second and page size low enough to fail the test if update does not succeed
SettingsConfig.builder().setRequestsPerSecond(1F).setMaxPageSearchSize(10),
indexName
).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build();
assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
assertBusy(() -> {
TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
assertThat(stateAndStats.getState(), equalTo(TransformStats.State.INDEXING));
});
TransformConfigUpdate update = TransformConfigUpdate.builder()
// test randomly: with explicit settings and reset to default
.setSettings(
SettingsConfig.builder()
.setRequestsPerSecond(randomBoolean() ? 1000F : null)
.setMaxPageSearchSize(randomBoolean() ? 1000 : null)
.build()
)
.build();
updateConfig(config.getId(), update);
waitUntilCheckpoint(config.getId(), 1L);
assertThat(getTransformStats(config.getId()).getTransformsStats().get(0).getState(), equalTo(TransformStats.State.STARTED));
long docsIndexed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed();
long pagesProcessed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getPagesProcessed();
TransformConfig storedConfig = getTransform(config.getId()).getTransformConfigurations().get(0);
assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT));
Instant now = Instant.now();
assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now));
// index some more docs
long timeStamp = Instant.now().toEpochMilli() - 1_000;
long user = 42;
indexMoreDocs(timeStamp, user, indexName);
waitUntilCheckpoint(config.getId(), 2L);
// Assert that we wrote the new docs
assertThat(
getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(),
greaterThan(docsIndexed)
);
// Assert less than 500 pages processed, so update worked
assertThat(pagesProcessed, lessThan(1000L));
stopTransform(config.getId());
deleteTransform(config.getId());
}
private void indexMoreDocs(long timestamp, long userId, String index) throws Exception {
BulkRequest bulk = new BulkRequest(index);
for (int i = 0; i < 25; i++) {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.client.transform.StopTransformResponse;
import org.elasticsearch.client.transform.UpdateTransformRequest;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.QueryConfig;
import org.elasticsearch.client.transform.transforms.SettingsConfig;
import org.elasticsearch.client.transform.transforms.SourceConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.TransformConfigUpdate;
@ -88,7 +89,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
}
private void logAudits() throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
// using '*' to make this lenient and do not fail if the audit index does not exist
SearchRequest searchRequest = new SearchRequest(".transform-notifications-*");
@ -111,6 +112,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
);
}
}
}
protected void cleanUpTransforms() throws IOException {
for (TransformConfig config : transformConfigs.values()) {
@ -126,46 +128,53 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
protected StopTransformResponse stopTransform(String id, boolean waitForCompletion, TimeValue timeout, boolean waitForCheckpoint)
throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
return restClient.transform()
.stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT);
}
}
protected StartTransformResponse startTransform(String id, RequestOptions options) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
return restClient.transform().startTransform(new StartTransformRequest(id), options);
}
}
protected AcknowledgedResponse deleteTransform(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
AcknowledgedResponse response = restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
transformConfigs.remove(id);
}
return response;
}
}
protected AcknowledgedResponse putTransform(TransformConfig config, RequestOptions options) throws IOException {
if (transformConfigs.keySet().contains(config.getId())) {
throw new IllegalArgumentException("transform [" + config.getId() + "] is already registered");
}
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
AcknowledgedResponse response = restClient.transform().putTransform(new PutTransformRequest(config), options);
if (response.isAcknowledged()) {
transformConfigs.put(config.getId(), config);
}
return response;
}
}
protected GetTransformStatsResponse getTransformStats(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
return restClient.transform().getTransformStats(new GetTransformStatsRequest(id), RequestOptions.DEFAULT);
}
}
protected GetTransformResponse getTransform(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
return restClient.transform().getTransform(new GetTransformRequest(id), RequestOptions.DEFAULT);
}
}
protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30));
@ -243,7 +252,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
String destinationIndex,
String... sourceIndices
) throws Exception {
return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), sourceIndices);
return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), null, sourceIndices);
}
protected TransformConfig.Builder createTransformConfigBuilder(
@ -252,6 +261,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
AggregatorFactories.Builder aggregations,
String destinationIndex,
QueryBuilder queryBuilder,
SettingsConfig.Builder settingsBuilder,
String... sourceIndices
) throws Exception {
return TransformConfig.builder()
@ -260,6 +270,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
.setFrequency(TimeValue.timeValueSeconds(10))
.setPivotConfig(createPivotConfig(groups, aggregations))
.setSettings(settingsBuilder != null ? settingsBuilder.build() : null)
.setDescription("Test transform config id: " + id);
}
@ -269,24 +280,28 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
AggregatorFactories.Builder aggregations,
String destinationIndex,
QueryBuilder queryBuilder,
SettingsConfig.Builder settingsBuilder,
String... sourceIndices
) throws Exception {
return createTransformConfigBuilder(id, groups, aggregations, destinationIndex, queryBuilder, sourceIndices).build();
return createTransformConfigBuilder(id, groups, aggregations, destinationIndex, queryBuilder, settingsBuilder, sourceIndices)
.build();
}
protected void bulkIndexDocs(BulkRequest request) throws Exception {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
BulkResponse response = restClient.bulk(request, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
}
}
protected void updateConfig(String id, TransformConfigUpdate update) throws Exception {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
restClient.transform().updateTransform(new UpdateTransformRequest(update, id), RequestOptions.DEFAULT);
}
}
protected void createReviewsIndex(String indexName, int numDocs) throws Exception {
RestHighLevelClient restClient = new TestRestHighLevelClient();
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
@ -356,6 +371,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
restClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
}
}
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {
BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false);
@ -376,8 +392,8 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
listTasksRequest.setWaitForCompletion(true);
listTasksRequest.setDetailed(true);
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
RestHighLevelClient restClient = new TestRestHighLevelClient();
try {
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
restClient.tasks().list(listTasksRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);

View File

@ -994,7 +994,6 @@ public class TransformPivotRestIT extends TransformRestTestCase {
+ transformIndex
+ "\"},"
+ " \"pivot\": {"
+ " \"max_page_search_size\": 10,"
+ " \"group_by\": {"
+ " \"user.id\": {\"terms\": { \"field\": \"user_id\" }},"
+ " \"business.id\": {\"terms\": { \"field\": \"business_id\" }},"
@ -1007,7 +1006,10 @@ public class TransformPivotRestIT extends TransformRestTestCase {
+ " \"user.avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ " } } } },"
+ " \"settings\": {"
+ " \"max_page_search_size\": 10"
+ " }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));

View File

@ -137,6 +137,7 @@ public class TransformProgressIT extends ESRestTestCase {
null,
null,
pivotConfig,
null,
null
);
@ -155,7 +156,7 @@ public class TransformProgressIT extends ESRestTestCase {
QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
sourceConfig = new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, queryConfig);
config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null);
config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null, null);
response = restClient.search(
TransformProgressGatherer.getSearchRequest(config, config.getSource().getQueryConfig().getQuery()),
@ -172,7 +173,7 @@ public class TransformProgressIT extends ESRestTestCase {
Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, 50.0))
);
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null);
config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null, null);
response = restClient.search(
TransformProgressGatherer.getSearchRequest(config, config.getSource().getQueryConfig().getQuery()),

View File

@ -9,20 +9,23 @@ package org.elasticsearch.xpack.transform.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@ -30,6 +33,7 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.node.Node;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
@ -50,15 +54,17 @@ import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Respo
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
import java.io.IOException;
import java.time.Clock;
import java.util.List;
import java.util.Map;
@ -66,7 +72,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.transform.action.TransportPutTransformAction.buildPrivilegeCheck;
public class TransportUpdateTransformAction extends TransportMasterNodeAction<Request, Response> {
public class TransportUpdateTransformAction extends TransportTasksAction<TransformTask, Request, Response, Response> {
private static final Logger logger = LogManager.getLogger(TransportUpdateTransformAction.class);
private final XPackLicenseState licenseState;
@ -75,6 +81,8 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
private final SecurityContext securityContext;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportUpdateTransformAction(
@ -114,7 +122,17 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
TransformServices transformServices,
Client client
) {
super(name, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
super(
name,
clusterService,
transportService,
actionFilters,
Request::fromStreamWithBWC,
Response::fromStreamWithBWC,
Response::fromStreamWithBWC,
ThreadPool.Names.SAME
);
this.licenseState = licenseState;
this.client = client;
this.transformConfigManager = transformServices.getConfigManager();
@ -131,28 +149,36 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}
@Override
protected void masterOperation(Request request, ClusterState clusterState, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
if (!licenseState.isAllowed(XPackLicenseState.Feature.TRANSFORM)) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.TRANSFORM));
return;
}
final ClusterState clusterState = clusterService.state();
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates update transform to elected master node so it becomes the coordinating node.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException());
} else {
transportService.sendRequest(
nodes.getMasterNode(),
actionName,
request,
new ActionListenerResponseHandler<>(listener, Response::fromStreamWithBWC)
);
}
return;
}
// set headers to run transform as calling user
Map<String, String> filteredHeaders = threadPool.getThreadContext()
.getHeaders()
@ -174,6 +200,33 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
return;
}
TransformConfig updatedConfig = update.apply(config);
final ActionListener<Response> updateListener;
if (update.changesSettings(config)) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = tasksMetadata.getTask(request.getId());
// to send a request to apply new settings at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
// - the node where transform is executed on is at least 7.8.0 in order to understand the request
if (transformTask != null
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)
) {
request.setNodes(transformTask.getExecutorNode());
updateListener = ActionListener.wrap(updateResponse -> {
request.setConfig(updateResponse.getConfig());
super.doExecute(task, request, listener);
}, listener::onFailure);
} else {
updateListener = listener;
}
} else {
updateListener = listener;
}
sourceDestValidator.validate(
clusterState,
updatedConfig.getSource().getIndex(),
@ -181,7 +234,7 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
ActionListener.wrap(
validationResponse -> {
checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), listener);
checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), updateListener);
},
listener::onFailure
)
@ -191,8 +244,22 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void taskOperation(Request request, TransformTask transformTask, ActionListener<Response> listener) {
// apply the settings
transformTask.applyNewSettings(request.getConfig().getSettings());
listener.onResponse(new Response(request.getConfig()));
}
@Override
protected Response newResponse(
Request request,
List<Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions
) {
// there should be only 1 response, todo: check
return tasks.get(0);
}
private void handlePrivsResponse(
@ -371,4 +438,5 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
pivot.deduceMappings(client, config.getSource(), deduceMappingsListener);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
@ -50,6 +51,7 @@ import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -84,6 +86,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
protected final TransformConfigManager transformsConfigManager;
private final CheckpointProvider checkpointProvider;
private final TransformProgressGatherer progressGatherer;
private volatile float docsPerSecond = -1;
protected final TransformAuditor auditor;
protected final TransformContext context;
@ -97,7 +100,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
private final Map<String, String> fieldMappings;
private Pivot pivot;
private int pageSize = 0;
private volatile Integer initialConfiguredPageSize;
private volatile int pageSize = 0;
private long logEvery = 1;
private long logCount = 0;
private volatile TransformCheckpoint lastCheckpoint;
@ -144,6 +148,10 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
// give runState a default
this.runState = RunState.APPLY_BUCKET_RESULTS;
if (transformConfig.getSettings() != null && transformConfig.getSettings().getDocsPerSecond() != null) {
docsPerSecond = transformConfig.getSettings().getDocsPerSecond();
}
}
public int getPageSize() {
@ -155,6 +163,11 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return transformConfig.getId();
}
@Override
protected float getMaxDocsPerSecond() {
return docsPerSecond;
}
public TransformConfig getConfig() {
return transformConfig;
}
@ -229,7 +242,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
// if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory
if (pageSize == 0) {
pageSize = pivot.getInitialPageSize();
configurePageSize(getConfig().getSettings().getMaxPageSearchSize());
}
runState = determineRunStateAtStart();
@ -440,6 +453,22 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return super.maybeTriggerAsyncJob(now);
}
/**
* Handle new settings at runtime, this is triggered by a call to _transform/id/_update
*
* @param newSettings The new settings that should be applied
*/
public void applyNewSettings(SettingsConfig newSettings) {
auditor.info(transformConfig.getId(), "Transform settings have been updated.");
logger.info("[{}] transform settings have been updated.", transformConfig.getId());
docsPerSecond = newSettings.getDocsPerSecond() != null ? newSettings.getDocsPerSecond() : -1;
if (Objects.equals(newSettings.getMaxPageSearchSize(), initialConfiguredPageSize) == false) {
configurePageSize(newSettings.getMaxPageSearchSize());
}
rethrottle();
}
@Override
protected void onFailure(Exception exc) {
// the failure handler must not throw an exception due to internal problems
@ -885,6 +914,17 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return RunState.IDENTIFY_CHANGES;
}
private void configurePageSize(Integer newPageSize) {
initialConfiguredPageSize = newPageSize;
// if the user explicitly set a page size, take it from the config, otherwise let the function decide
if (initialConfiguredPageSize != null && initialConfiguredPageSize > 0) {
pageSize = initialConfiguredPageSize;
} else {
pageSize = pivot.getInitialPageSize();
}
}
/**
* Thrown when the transform configuration disappeared permanently.
* (not if reloading failed due to an intermittent problem)

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
@ -369,6 +370,10 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
}
}
public synchronized void applyNewSettings(SettingsConfig newSettings) {
getIndexer().applyNewSettings(newSettings);
}
@Override
protected void init(
PersistentTasksService persistentTasksService,

View File

@ -30,14 +30,12 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
@ -60,6 +58,7 @@ import java.util.function.Function;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests.randomPivotConfig;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.empty;
@ -236,8 +235,9 @@ public class TransformIndexerTests extends ESTestCase {
null,
null,
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null)
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
@ -303,8 +303,9 @@ public class TransformIndexerTests extends ESTestCase {
null,
null,
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null)
);
SearchResponse searchResponse = new SearchResponse(
new InternalSearchResponse(
@ -361,8 +362,9 @@ public class TransformIndexerTests extends ESTestCase {
null,
null,
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null)
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {

View File

@ -109,6 +109,8 @@ public class PivotTests extends ESTestCase {
pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
}
public void testSearchFailure() throws Exception {

View File

@ -214,7 +214,15 @@
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
#- match: { transforms.0.state: "stopped" }
- do:
allowed_warnings:
- '[_data_frame/transforms/] is deprecated, use [_transform/] in the future.'
data_frame_transform_deprecated.update_transform:
transform_id: "mixed-simple-continuous-transform"
body: >
{
"description": "Simple continuous transform"
}
---
"Test GET, start, and stop old cluster batch transforms":
- skip:
@ -321,7 +329,15 @@
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
#- match: { transforms.0.state: "stopped" }
- do:
allowed_warnings:
- '[_data_frame/transforms/] is deprecated, use [_transform/] in the future.'
data_frame_transform_deprecated.update_transform:
transform_id: "old-complex-transform"
body: >
{
"description": "old complex transform"
}
---
"Test GET, stop, start, old continuous transforms":
- skip:
@ -380,3 +396,12 @@
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
#- match: { transforms.0.state: "stopped" }
- do:
allowed_warnings:
- '[_data_frame/transforms/] is deprecated, use [_transform/] in the future.'
data_frame_transform_deprecated.update_transform:
transform_id: "old-simple-continuous-transform"
body: >
{
"description": "old simple continuous transform"
}

View File

@ -241,6 +241,16 @@ setup:
- match: { transforms.0.id: "mixed-simple-continuous-transform" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
transform.update_transform:
transform_id: "mixed-simple-continuous-transform"
body: >
{
"settings": {
"max_page_search_size": 1000
}
}
- do:
transform.stop_transform:
transform_id: "mixed-simple-continuous-transform"
@ -254,6 +264,13 @@ setup:
- match: { transforms.0.id: "mixed-simple-continuous-transform" }
- match: { transforms.0.state: "stopped" }
- do:
transform.get_transform:
transform_id: "mixed-simple-continuous-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-continuous-transform" }
- match: { transforms.0.settings.max_page_search_size: 1000 }
- do:
transform.delete_transform:
transform_id: "mixed-simple-continuous-transform"