diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index c719cf7723d..4253cfdca06 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -123,7 +123,7 @@ public class TransformStats implements Writeable, ToXContentObject { TransformState transformState = new TransformState(in); this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()); this.reason = transformState.getReason(); - this.node = null; + this.node = transformState.getNode(); this.indexerStats = new TransformIndexerStats(in); this.checkpointingInfo = new TransformCheckpointingInfo(in); } @@ -171,8 +171,8 @@ public class TransformStats implements Writeable, ToXContentObject { checkpointingInfo.getNext().getPosition(), checkpointingInfo.getLast().getCheckpoint(), reason, - checkpointingInfo.getNext().getCheckpointProgress()).writeTo(out); - out.writeBoolean(false); + checkpointingInfo.getNext().getCheckpointProgress(), + node).writeTo(out); indexerStats.writeTo(out); checkpointingInfo.writeTo(out); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index 093e05f4f90..387e9d115c4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -13,6 +16,9 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; import java.util.function.Predicate; +import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED; +import static org.hamcrest.Matchers.equalTo; + public class TransformStatsTests extends AbstractSerializingTestCase { public static TransformStats randomDataFrameTransformStats() { @@ -53,4 +59,28 @@ public class TransformStatsTests extends AbstractSerializingTestCase getRandomFieldsExcludeFilter() { return field -> !field.isEmpty(); } + + public void testBwcWith73() throws IOException { + for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + TransformStats stats = new TransformStats("bwc-id", + STARTED, + randomBoolean() ? null : randomAlphaOfLength(100), + randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(), + new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + new TransformCheckpointingInfo( + new TransformCheckpointStats(0, null, null, 10, 100), + new TransformCheckpointStats(0, null, null, 100, 1000), + // changesLastDetectedAt aren't serialized back + 100, null)); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_3_0); + stats.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_3_0); + TransformStats statsFromOld = new TransformStats(in); + assertThat(statsFromOld, equalTo(stats)); + } + } + } + } }