[7.x] [ML][Data Frame] add node attr to GET _stats (#43842) (#43894)

* [ML][Data Frame] add node attr to GET _stats (#43842)

* [ML][Data Frame] add node attr to GET _stats

* addressing testing issues with node.attributes

* adjusting for backport
This commit is contained in:
Benjamin Trent 2019-07-02 19:35:37 -05:00 committed by GitHub
parent 2c97e26ce8
commit fb825a6470
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 637 additions and 29 deletions

View File

@ -43,6 +43,7 @@ public class DataFrameTransformState {
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
private static final ParseField NODE = new ParseField("node");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
@ -52,7 +53,8 @@ public class DataFrameTransformState {
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4],
(DataFrameTransformProgress) args[5]));
(DataFrameTransformProgress) args[5],
(NodeAttributes) args[6]));
static {
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
@ -61,6 +63,7 @@ public class DataFrameTransformState {
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT);
}
public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException {
@ -73,19 +76,22 @@ public class DataFrameTransformState {
private final Map<String, Object> currentPosition;
private final String reason;
private final DataFrameTransformProgress progress;
private final NodeAttributes node;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this.node = node;
}
public IndexerState getIndexerState() {
@ -115,6 +121,11 @@ public class DataFrameTransformState {
return progress;
}
@Nullable
public NodeAttributes getNode() {
return node;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -132,12 +143,13 @@ public class DataFrameTransformState {
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.progress, that.progress) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.node, that.node) &&
Objects.equals(this.reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
/**
* A Pojo class containing an Elastic Node's attributes
*/
public class NodeAttributes implements ToXContentObject {
public static final ParseField ID = new ParseField("id");
public static final ParseField NAME = new ParseField("name");
public static final ParseField EPHEMERAL_ID = new ParseField("ephemeral_id");
public static final ParseField TRANSPORT_ADDRESS = new ParseField("transport_address");
public static final ParseField ATTRIBUTES = new ParseField("attributes");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NodeAttributes, Void> PARSER =
new ConstructingObjectParser<>("node", true,
(a) -> {
int i = 0;
String id = (String) a[i++];
String name = (String) a[i++];
String ephemeralId = (String) a[i++];
String transportAddress = (String) a[i++];
Map<String, String> attributes = (Map<String, String>) a[i];
return new NodeAttributes(id, name, ephemeralId, transportAddress, attributes);
});
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
PARSER.declareString(ConstructingObjectParser.constructorArg(), EPHEMERAL_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TRANSPORT_ADDRESS);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> p.mapStrings(),
ATTRIBUTES,
ObjectParser.ValueType.OBJECT);
}
private final String id;
private final String name;
private final String ephemeralId;
private final String transportAddress;
private final Map<String, String> attributes;
public NodeAttributes(String id, String name, String ephemeralId, String transportAddress, Map<String, String> attributes) {
this.id = id;
this.name = name;
this.ephemeralId = ephemeralId;
this.transportAddress = transportAddress;
this.attributes = Collections.unmodifiableMap(attributes);
}
/**
* The unique identifier of the node.
*/
public String getId() {
return id;
}
/**
* The node name.
*/
public String getName() {
return name;
}
/**
* The ephemeral id of the node.
*/
public String getEphemeralId() {
return ephemeralId;
}
/**
* The host and port where transport HTTP connections are accepted.
*/
public String getTransportAddress() {
return transportAddress;
}
/**
* Additional attributes related to this node
*/
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(NAME.getPreferredName(), name);
builder.field(EPHEMERAL_ID.getPreferredName(), ephemeralId);
builder.field(TRANSPORT_ADDRESS.getPreferredName(), transportAddress);
builder.field(ATTRIBUTES.getPreferredName(), attributes);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id, name, ephemeralId, transportAddress, attributes);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
NodeAttributes that = (NodeAttributes) other;
return Objects.equals(id, that.id) &&
Objects.equals(name, that.name) &&
Objects.equals(ephemeralId, that.ephemeralId) &&
Objects.equals(transportAddress, that.transportAddress) &&
Objects.equals(attributes, that.attributes);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
@ -37,7 +38,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
DataFrameTransformStateTests::toXContent,
DataFrameTransformState::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("current_position"))
.randomFieldsExcludeFilter(field -> field.equals("current_position") ||
field.equals("node.attributes"))
.test();
}
@ -47,7 +49,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
randomPositionMap(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance());
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
randomBoolean() ? null : NodeAttributesTests.createRandom());
}
public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
@ -65,6 +68,10 @@ public class DataFrameTransformStateTests extends ESTestCase {
builder.field("progress");
DataFrameTransformProgressTests.toXContent(state.getProgress(), builder);
}
if (state.getNode() != null) {
builder.field("node");
state.getNode().toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
public class NodeAttributesTests extends AbstractXContentTestCase<NodeAttributes> {
public static NodeAttributes createRandom() {
int numberOfAttributes = randomIntBetween(1, 10);
Map<String, String> attributes = new HashMap<>(numberOfAttributes);
for(int i = 0; i < numberOfAttributes; i++) {
String val = randomAlphaOfLength(10);
attributes.put("key-"+i, val);
}
return new NodeAttributes(randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
attributes);
}
@Override
protected NodeAttributes createTestInstance() {
return createRandom();
}
@Override
protected NodeAttributes doParseInstance(XContentParser parser) throws IOException {
return NodeAttributes.PARSER.parse(parser, null);
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> !field.isEmpty();
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -64,7 +64,7 @@ public class DataFrameTransformStateAndStatsTests extends AbstractHlrcXContentTe
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("state.current_position");
return field -> field.equals("state.current_position") || field.equals("state.node") || field.equals("state.node.attributes");
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgr
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
@ -40,8 +41,20 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getCheckpoint(),
instance.getReason(), DataFrameTransformProgressTests.fromHlrc(instance.getProgress()));
IndexerState.fromString(instance.getIndexerState().value()),
instance.getPosition(),
instance.getCheckpoint(),
instance.getReason(),
DataFrameTransformProgressTests.fromHlrc(instance.getProgress()),
fromHlrc(instance.getNode()));
}
public static NodeAttributes fromHlrc(org.elasticsearch.client.dataframe.transforms.NodeAttributes attributes) {
return attributes == null ? null : new NodeAttributes(attributes.getId(),
attributes.getName(),
attributes.getEphemeralId(),
attributes.getTransportAddress(),
attributes.getAttributes());
}
@Override
@ -72,7 +85,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("current_position");
return field -> field.equals("current_position") || field.equals("node.attributes");
}
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
@ -97,6 +110,20 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
return new DataFrameTransformProgress(totalDocs, remainingDocs);
}
public static NodeAttributes randomNodeAttributes() {
int numberOfAttributes = randomIntBetween(1, 10);
Map<String, String> attributes = new HashMap<>(numberOfAttributes);
for(int i = 0; i < numberOfAttributes; i++) {
String val = randomAlphaOfLength(10);
attributes.put("key-"+i, val);
}
return new NodeAttributes(randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
attributes);
}
public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
@ -110,7 +137,8 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
randomPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress());
randomBoolean() ? null : randomDataFrameTransformProgress(),
randomBoolean() ? null : randomNodeAttributes());
}
private static Map<String, Object> randomPosition() {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.NodeAttributes;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
@ -533,6 +534,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
stateAndStats.getTransformStats(); // <4>
DataFrameTransformProgress progress =
stateAndStats.getTransformState().getProgress(); // <5>
NodeAttributes node =
stateAndStats.getTransformState().getNode(); // <6>
// end::get-data-frame-transform-stats-response
assertEquals(IndexerState.STOPPED, indexerState);

View File

@ -53,3 +53,4 @@ include-tagged::{doc-tests-file}[{api}-response]
<4> The overall transform statistics recording the number of documents indexed etc.
<5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint
and the total number of docs expected.
<6> The assigned node information if the task is currently assigned to a node and running.

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -41,6 +42,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private final Map<String, Object> currentPosition;
@Nullable
private final String reason;
@Nullable
private NodeAttributes node;
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
@ -48,6 +51,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField NODE = new ParseField("node");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
@ -57,7 +61,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4],
(DataFrameTransformProgress) args[5]));
(DataFrameTransformProgress) args[5],
(NodeAttributes) args[6]));
static {
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
@ -66,6 +71,23 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT);
}
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this.node = node;
}
public DataFrameTransformState(DataFrameTransformTaskState taskState,
@ -74,12 +96,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
this(taskState, indexerState, position, checkpoint, reason, progress, null);
}
public DataFrameTransformState(StreamInput in) throws IOException {
@ -90,6 +107,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
checkpoint = in.readLong();
reason = in.readOptionalString();
progress = in.readOptionalWriteable(DataFrameTransformProgress::new);
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
node = in.readOptionalWriteable(NodeAttributes::new);
} else {
node = null;
}
}
public DataFrameTransformTaskState getTaskState() {
@ -125,6 +147,15 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return reason;
}
public NodeAttributes getNode() {
return node;
}
public DataFrameTransformState setNode(NodeAttributes node) {
this.node = node;
return this;
}
public static DataFrameTransformState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
@ -148,6 +179,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
if (progress != null) {
builder.field(PROGRESS.getPreferredName(), progress);
}
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
builder.endObject();
return builder;
}
@ -165,6 +199,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
out.writeLong(checkpoint);
out.writeOptionalString(reason);
out.writeOptionalWriteable(progress);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalWriteable(node);
}
}
@Override
@ -184,12 +221,13 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
Objects.equals(this.currentPosition, that.currentPosition) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason) &&
Objects.equals(this.progress, that.progress);
Objects.equals(this.progress, that.progress) &&
Objects.equals(this.node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
}
@Override

View File

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
/**
* A Pojo class containing an Elastic Node's attributes
*/
public class NodeAttributes implements ToXContentObject, Writeable {
public static final ParseField ID = new ParseField("id");
public static final ParseField NAME = new ParseField("name");
public static final ParseField EPHEMERAL_ID = new ParseField("ephemeral_id");
public static final ParseField TRANSPORT_ADDRESS = new ParseField("transport_address");
public static final ParseField ATTRIBUTES = new ParseField("attributes");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NodeAttributes, Void> PARSER =
new ConstructingObjectParser<>("node", true,
(a) -> {
int i = 0;
String id = (String) a[i++];
String name = (String) a[i++];
String ephemeralId = (String) a[i++];
String transportAddress = (String) a[i++];
Map<String, String> attributes = (Map<String, String>) a[i];
return new NodeAttributes(id, name, ephemeralId, transportAddress, attributes);
});
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
PARSER.declareString(ConstructingObjectParser.constructorArg(), EPHEMERAL_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TRANSPORT_ADDRESS);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> p.mapStrings(),
ATTRIBUTES,
ObjectParser.ValueType.OBJECT);
}
private final String id;
private final String name;
private final String ephemeralId;
private final String transportAddress;
private final Map<String, String> attributes;
public static NodeAttributes fromDiscoveryNode(DiscoveryNode node) {
return new NodeAttributes(node.getId(),
node.getName(),
node.getEphemeralId(),
node.getAddress().toString(),
// TODO add data_frame attributes when/if they are added
Collections.emptyMap());
}
public NodeAttributes(String id, String name, String ephemeralId, String transportAddress, Map<String, String> attributes) {
this.id = ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
this.name = ExceptionsHelper.requireNonNull(name, NAME.getPreferredName());
this.ephemeralId = ExceptionsHelper.requireNonNull(ephemeralId, EPHEMERAL_ID.getPreferredName());
this.transportAddress = ExceptionsHelper.requireNonNull(transportAddress, TRANSPORT_ADDRESS.getPreferredName());
this.attributes = Collections.unmodifiableMap(ExceptionsHelper.requireNonNull(attributes, ATTRIBUTES.getPreferredName()));
}
public NodeAttributes(StreamInput in) throws IOException {
this.id = in.readString();
this.name = in.readString();
this.ephemeralId = in.readString();
this.transportAddress = in.readString();
this.attributes = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
/**
* The unique identifier of the node.
*/
public String getId() {
return id;
}
/**
* The node name.
*/
public String getName() {
return name;
}
/**
* The ephemeral id of the node.
*/
public String getEphemeralId() {
return ephemeralId;
}
/**
* The host and port where transport HTTP connections are accepted.
*/
public String getTransportAddress() {
return transportAddress;
}
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(NAME.getPreferredName(), name);
builder.field(EPHEMERAL_ID.getPreferredName(), ephemeralId);
builder.field(TRANSPORT_ADDRESS.getPreferredName(), transportAddress);
builder.field(ATTRIBUTES.getPreferredName(), attributes);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id, name, ephemeralId, transportAddress, attributes);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
NodeAttributes that = (NodeAttributes) other;
return Objects.equals(id, that.id) &&
Objects.equals(name, that.name) &&
Objects.equals(ephemeralId, that.ephemeralId) &&
Objects.equals(transportAddress, that.transportAddress) &&
Objects.equals(attributes, that.attributes);
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeString(name);
out.writeString(ephemeralId);
out.writeString(transportAddress);
out.writeMap(attributes, StreamOutput::writeString, StreamOutput::writeString);
}
}

View File

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress;
import static org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributeTests.randomNodeAttributes;
public class DataFrameTransformStateTests extends AbstractSerializingTestCase<DataFrameTransformState> {
@ -26,7 +27,8 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
randomPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress());
randomBoolean() ? null : randomDataFrameTransformProgress(),
randomBoolean() ? null : randomNodeAttributes());
}
@Override

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Predicate;
public class NodeAttributeTests extends AbstractSerializingTestCase<NodeAttributes> {
public static NodeAttributes randomNodeAttributes() {
return new NodeAttributes(randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
@Override
protected NodeAttributes doParseInstance(XContentParser parser) throws IOException {
return NodeAttributes.PARSER.apply(parser, null);
}
@Override
protected NodeAttributes createTestInstance() {
return randomNodeAttributes();
}
@Override
protected Reader<NodeAttributes> instanceReader() {
return NodeAttributes::new;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("attributes");
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -29,6 +30,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
@ -110,15 +112,23 @@ public class TransportGetDataFrameTransformsStatsAction extends
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
request.setExpandedIds(hitsAndIds.v2());
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
final ClusterState state = clusterService.state();
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, ActionListener.wrap(
finalResponse -> finalListener.onResponse(new Response(finalResponse.getTransformsStateAndStats(),
hitsAndIds.v1(),
finalResponse.getTaskFailures(),
finalResponse.getNodeFailures())),
finalListener::onFailure
)),
response -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasksInProgress != null) {
// Mutates underlying state object with the assigned node attributes
response.getTransformsStateAndStats().forEach(dtsas -> setNodeAttributes(dtsas, tasksInProgress, state));
}
collectStatsForTransformsWithoutTasks(request, response, ActionListener.wrap(
finalResponse -> finalListener.onResponse(new Response(finalResponse.getTransformsStateAndStats(),
hitsAndIds.v1(),
finalResponse.getTaskFailures(),
finalResponse.getNodeFailures())),
finalListener::onFailure
));
},
finalListener::onFailure
));
},
@ -133,6 +143,17 @@ public class TransportGetDataFrameTransformsStatsAction extends
));
}
private static void setNodeAttributes(DataFrameTransformStateAndStats dataFrameTransformStateAndStats,
PersistentTasksCustomMetaData persistentTasksCustomMetaData,
ClusterState state) {
PersistentTasksCustomMetaData.PersistentTask<?> pTask =
persistentTasksCustomMetaData.getTask(dataFrameTransformStateAndStats.getTransformId());
if (pTask != null) {
dataFrameTransformStateAndStats.getTransformState()
.setNode(NodeAttributes.fromDiscoveryNode(state.nodes().get(pTask.getExecutorNode())));
}
}
private void collectStatsForTransformsWithoutTasks(Request request,
Response response,
ActionListener<Response> listener) {

View File

@ -209,3 +209,56 @@ teardown:
- match: { transforms.0.stats.search_time_in_ms: 0 }
- match: { transforms.0.stats.search_total: 0 }
- match: { transforms.0.stats.search_failures: 0 }
---
"Test get continuous transform stats":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stats-continuous"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-stats-continuous" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": { "time": { "field": "time", "delay": "1m" } }
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stats-continuous"
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-stats-continuous"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats-continuous" }
- match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" }
# Since this is continuous, there is no worry of it automatically stopping
- match: { transforms.0.state.task_state: "started" }
- lte: { transforms.0.state.checkpoint: 1 }
# Since this is continuous, and _start does not return until it is assigned
# we should see a node assignment
- is_true: transforms.0.state.node
- is_true: transforms.0.state.node.id
- is_true: transforms.0.state.node.name
- is_true: transforms.0.state.node.ephemeral_id
- is_true: transforms.0.state.node.transport_address
- lte: { transforms.0.stats.pages_processed: 1 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }
- lte: { transforms.0.stats.trigger_count: 1 }
- match: { transforms.0.stats.index_time_in_ms: 0 }
- match: { transforms.0.stats.index_total: 0 }
- match: { transforms.0.stats.index_failures: 0 }
- gte: { transforms.0.stats.search_time_in_ms: 0 }
- lte: { transforms.0.stats.search_total: 1 }
- match: { transforms.0.stats.search_failures: 0 }
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-stats-continuous"
wait_for_completion: true
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats-continuous"