diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index 8431abc886d..da5c4e934a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -62,20 +62,19 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { private final long timeUpperBoundMillis; private static ConstructingObjectParser createParser(boolean lenient) { - ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, - lenient, args -> { - String id = (String) args[0]; - long timestamp = (Long) args[1]; - long checkpoint = (Long) args[2]; + ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, args -> { + String id = (String) args[0]; + long timestamp = (Long) args[1]; + long checkpoint = (Long) args[2]; - @SuppressWarnings("unchecked") - Map checkpoints = (Map) args[3]; + @SuppressWarnings("unchecked") + Map checkpoints = (Map) args[3]; - Long timeUpperBound = (Long) args[4]; + Long timeUpperBound = (Long) args[4]; - // ignored, only for internal storage: String docType = (String) args[5]; - return new TransformCheckpoint(id, timestamp, checkpoint, checkpoints, timeUpperBound); - }); + // ignored, only for internal storage: String docType = (String) args[5]; + return new TransformCheckpoint(id, timestamp, checkpoint, checkpoints, timeUpperBound); + }); parser.declareString(constructorArg(), TransformField.ID); @@ -83,7 +82,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { parser.declareLong(constructorArg(), TransformField.TIMESTAMP_MILLIS); parser.declareLong(constructorArg(), CHECKPOINT); - parser.declareObject(constructorArg(), (p,c) -> { + parser.declareObject(constructorArg(), (p, c) -> { Map checkPointsByIndexName = new TreeMap<>(); XContentParser.Token token = null; while ((token = p.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -108,8 +107,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { return parser; } - public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map checkpoints, - Long timeUpperBound) { + public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map checkpoints, Long timeUpperBound) { this.transformId = Objects.requireNonNull(transformId); this.timestampMillis = timestamp; this.checkpoint = checkpoint; @@ -126,7 +124,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { } public boolean isEmpty() { - return indicesCheckpoints.isEmpty(); + return this.equals(EMPTY); } /** @@ -212,8 +210,10 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { final TransformCheckpoint that = (TransformCheckpoint) other; // compare the timestamp, id, checkpoint and than call matches for the rest - return this.timestampMillis == that.timestampMillis && this.checkpoint == that.checkpoint - && this.timeUpperBoundMillis == that.timeUpperBoundMillis && matches(that); + return this.timestampMillis == that.timestampMillis + && this.checkpoint == that.checkpoint + && this.timeUpperBoundMillis == that.timeUpperBoundMillis + && matches(that); } /** @@ -224,7 +224,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { * @param that other checkpoint * @return true if checkpoints match */ - public boolean matches (TransformCheckpoint that) { + public boolean matches(TransformCheckpoint that) { if (this == that) { return true; } @@ -258,7 +258,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { return NAME + "-" + transformId + "-" + checkpoint; } - public static boolean isNullOrEmpty (TransformCheckpoint checkpoint) { + public static boolean isNullOrEmpty(TransformCheckpoint checkpoint) { return checkpoint == null || checkpoint.isEmpty(); } @@ -315,8 +315,11 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { if (e.getValue() instanceof long[]) { checkpoints.put(e.getKey(), (long[]) e.getValue()); } else { - throw new ElasticsearchParseException("expecting the checkpoints for [{}] to be a long[], but found [{}] instead", - e.getKey(), e.getValue().getClass()); + throw new ElasticsearchParseException( + "expecting the checkpoints for [{}] to be a long[], but found [{}] instead", + e.getKey(), + e.getValue().getClass() + ); } } return checkpoints; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index b0f665b7b80..8a2f04e1e82 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -31,11 +31,110 @@ import java.util.Objects; */ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { + /** + * Builder for collecting checkpointing information for the purpose of _stats + */ + public static class TransformCheckpointingInfoBuilder { + private TransformIndexerPosition nextCheckpointPosition; + private TransformProgress nextCheckpointProgress; + private TransformCheckpoint lastCheckpoint; + private TransformCheckpoint nextCheckpoint; + private TransformCheckpoint sourceCheckpoint; + private Instant changesLastDetectedAt; + private long operationsBehind; + + public TransformCheckpointingInfoBuilder() {} + + public TransformCheckpointingInfo build() { + if (lastCheckpoint == null) { + lastCheckpoint = TransformCheckpoint.EMPTY; + } + if (nextCheckpoint == null) { + nextCheckpoint = TransformCheckpoint.EMPTY; + } + if (sourceCheckpoint == null) { + sourceCheckpoint = TransformCheckpoint.EMPTY; + } + + // checkpointstats requires a non-negative checkpoint number + long lastCheckpointNumber = lastCheckpoint.getCheckpoint() > 0 ? lastCheckpoint.getCheckpoint() : 0; + long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0; + + return new TransformCheckpointingInfo( + new TransformCheckpointStats( + lastCheckpointNumber, + null, + null, + lastCheckpoint.getTimestamp(), + lastCheckpoint.getTimeUpperBound() + ), + new TransformCheckpointStats( + nextCheckpointNumber, + nextCheckpointPosition, + nextCheckpointProgress, + nextCheckpoint.getTimestamp(), + nextCheckpoint.getTimeUpperBound() + ), + operationsBehind, + changesLastDetectedAt + ); + } + + public TransformCheckpointingInfoBuilder setLastCheckpoint(TransformCheckpoint lastCheckpoint) { + this.lastCheckpoint = lastCheckpoint; + return this; + } + + public TransformCheckpoint getLastCheckpoint() { + return lastCheckpoint; + } + + public TransformCheckpointingInfoBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoint) { + this.nextCheckpoint = nextCheckpoint; + return this; + } + + public TransformCheckpoint getNextCheckpoint() { + return nextCheckpoint; + } + + public TransformCheckpointingInfoBuilder setSourceCheckpoint(TransformCheckpoint sourceCheckpoint) { + this.sourceCheckpoint = sourceCheckpoint; + return this; + } + + public TransformCheckpoint getSourceCheckpoint() { + return sourceCheckpoint; + } + + public TransformCheckpointingInfoBuilder setNextCheckpointProgress(TransformProgress nextCheckpointProgress) { + this.nextCheckpointProgress = nextCheckpointProgress; + return this; + } + + public TransformCheckpointingInfoBuilder setNextCheckpointPosition(TransformIndexerPosition nextCheckpointPosition) { + this.nextCheckpointPosition = nextCheckpointPosition; + return this; + } + + public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant changesLastDetectedAt) { + this.changesLastDetectedAt = changesLastDetectedAt; + return this; + } + + public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) { + this.operationsBehind = operationsBehind; + return this; + } + + } + public static final TransformCheckpointingInfo EMPTY = new TransformCheckpointingInfo( TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, 0L, - null); + null + ); public static final ParseField LAST_CHECKPOINT = new ParseField("last"); public static final ParseField NEXT_CHECKPOINT = new ParseField("next"); @@ -44,32 +143,41 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; - private Instant changesLastDetectedAt; + private final Instant changesLastDetectedAt; - private static final ConstructingObjectParser LENIENT_PARSER = - new ConstructingObjectParser<>( - "data_frame_transform_checkpointing_info", - true, - a -> { - long behind = a[2] == null ? 0L : (Long) a[2]; - Instant changesLastDetectedAt = (Instant)a[3]; - return new TransformCheckpointingInfo( - a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], - a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], - behind, - changesLastDetectedAt); - }); + private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( + "data_frame_transform_checkpointing_info", + true, + a -> { + long behind = a[2] == null ? 0L : (Long) a[2]; + Instant changesLastDetectedAt = (Instant) a[3]; + return new TransformCheckpointingInfo( + a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], + a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], + behind, + changesLastDetectedAt + ); + } + ); static { - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - TransformCheckpointStats.LENIENT_PARSER::apply, LAST_CHECKPOINT); - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - TransformCheckpointStats.LENIENT_PARSER::apply, NEXT_CHECKPOINT); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + TransformCheckpointStats.LENIENT_PARSER::apply, + LAST_CHECKPOINT + ); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + TransformCheckpointStats.LENIENT_PARSER::apply, + NEXT_CHECKPOINT + ); LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND); - LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), CHANGES_LAST_DETECTED_AT, - ObjectParser.ValueType.VALUE); + ObjectParser.ValueType.VALUE + ); } /** @@ -81,28 +189,26 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { * @param operationsBehind counter of operations the current checkpoint is behind source * @param changesLastDetectedAt the last time the source indices were checked for changes */ - public TransformCheckpointingInfo(TransformCheckpointStats last, - TransformCheckpointStats next, - long operationsBehind, - Instant changesLastDetectedAt) { + public TransformCheckpointingInfo( + TransformCheckpointStats last, + TransformCheckpointStats next, + long operationsBehind, + Instant changesLastDetectedAt + ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli()); } - public TransformCheckpointingInfo(TransformCheckpointStats last, - TransformCheckpointStats next, - long operationsBehind) { - this(last, next, operationsBehind, null); - } - public TransformCheckpointingInfo(StreamInput in) throws IOException { last = new TransformCheckpointStats(in); next = new TransformCheckpointStats(in); operationsBehind = in.readLong(); if (in.getVersion().onOrAfter(Version.V_7_4_0)) { changesLastDetectedAt = in.readOptionalInstant(); + } else { + changesLastDetectedAt = null; } } @@ -122,11 +228,6 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { return changesLastDetectedAt; } - public TransformCheckpointingInfo setChangesLastDetectedAt(Instant changesLastDetectedAt) { - this.changesLastDetectedAt = Instant.ofEpochMilli(Objects.requireNonNull(changesLastDetectedAt).toEpochMilli()); - return this; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -134,11 +235,15 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { if (next.getCheckpoint() > 0) { builder.field(NEXT_CHECKPOINT.getPreferredName(), next); } - builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind); + if (operationsBehind > 0) { + builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind); + } if (changesLastDetectedAt != null) { - builder.timeField(CHANGES_LAST_DETECTED_AT.getPreferredName(), + builder.timeField( + CHANGES_LAST_DETECTED_AT.getPreferredName(), CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string", - changesLastDetectedAt.toEpochMilli()); + changesLastDetectedAt.toEpochMilli() + ); } builder.endObject(); return builder; @@ -175,10 +280,10 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject { TransformCheckpointingInfo that = (TransformCheckpointingInfo) other; - return Objects.equals(this.last, that.last) && - Objects.equals(this.next, that.next) && - this.operationsBehind == that.operationsBehind && - Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); + return Objects.equals(this.last, that.last) + && Objects.equals(this.next, that.next) + && this.operationsBehind == that.operationsBehind + && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java index f70f6c68e01..86955d12cdb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -23,8 +24,13 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern; public class TransformCheckpointTests extends AbstractSerializingTransformTestCase { public static TransformCheckpoint randomTransformCheckpoints() { - return new TransformCheckpoint(randomAlphaOfLengthBetween(1, 10), randomNonNegativeLong(), randomNonNegativeLong(), - randomCheckpointsByIndex(), randomNonNegativeLong()); + return new TransformCheckpoint( + randomAlphaOfLengthBetween(1, 10), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomCheckpointsByIndex(), + randomNonNegativeLong() + ); } @Override @@ -62,8 +68,13 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa otherCheckpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), new long[] { 1, 2, 3 }); long timeUpperBound = randomNonNegativeLong(); - TransformCheckpoint dataFrameTransformCheckpoints = new TransformCheckpoint(id, timestamp, checkpoint, - checkpointsByIndex, timeUpperBound); + TransformCheckpoint dataFrameTransformCheckpoints = new TransformCheckpoint( + id, + timestamp, + checkpoint, + checkpointsByIndex, + timeUpperBound + ); // same assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpoints)); @@ -74,20 +85,40 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa assertTrue(dataFrameTransformCheckpointsCopy.matches(dataFrameTransformCheckpoints)); // other id - assertFalse(dataFrameTransformCheckpoints - .matches(new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound))); + assertFalse( + dataFrameTransformCheckpoints.matches( + new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound) + ) + ); // other timestamp - assertTrue(dataFrameTransformCheckpoints - .matches(new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound))); + assertTrue( + dataFrameTransformCheckpoints.matches( + new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound) + ) + ); // other checkpoint - assertTrue(dataFrameTransformCheckpoints - .matches(new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound))); + assertTrue( + dataFrameTransformCheckpoints.matches( + new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound) + ) + ); // other index checkpoints - assertFalse(dataFrameTransformCheckpoints - .matches(new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound))); + assertFalse( + dataFrameTransformCheckpoints.matches( + new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound) + ) + ); // other time upper bound - assertTrue(dataFrameTransformCheckpoints - .matches(new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1))); + assertTrue( + dataFrameTransformCheckpoints.matches( + new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1) + ) + ); + } + + public void testEmpty() { + assertTrue(TransformCheckpoint.EMPTY.isEmpty()); + assertFalse(new TransformCheckpoint("some_id", 0L, -1, Collections.emptyMap(), 0L).isEmpty()); } public void testGetBehind() { @@ -119,14 +150,16 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa long checkpoint = randomLongBetween(10, 100); - TransformCheckpoint checkpointOld = new TransformCheckpoint( - id, timestamp, checkpoint, checkpointsByIndexOld, 0L); - TransformCheckpoint checkpointTransientNew = new TransformCheckpoint( - id, timestamp, -1L, checkpointsByIndexNew, 0L); - TransformCheckpoint checkpointNew = new TransformCheckpoint( - id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L); + TransformCheckpoint checkpointOld = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndexOld, 0L); + TransformCheckpoint checkpointTransientNew = new TransformCheckpoint(id, timestamp, -1L, checkpointsByIndexNew, 0L); + TransformCheckpoint checkpointNew = new TransformCheckpoint(id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L); TransformCheckpoint checkpointOlderButNewerShardsCheckpoint = new TransformCheckpoint( - id, timestamp, checkpoint - 1, checkpointsByIndexNew, 0L); + id, + timestamp, + checkpoint - 1, + checkpointsByIndexNew, + 0L + ); assertEquals(indices * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew)); assertEquals(indices * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointNew)); @@ -140,8 +173,10 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa assertEquals(0L, TransformCheckpoint.getBehind(checkpointNew, checkpointTransientNew)); // transient new vs new: illegal - Exception e = expectThrows(IllegalArgumentException.class, - () -> TransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew)); + Exception e = expectThrows( + IllegalArgumentException.class, + () -> TransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew) + ); assertEquals("can not compare transient against a non transient checkpoint", e.getMessage()); // new vs old: illegal @@ -155,8 +190,10 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa // remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey()); long behind = TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew); - assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")", - behind >= indices * shards * 10L); + assertTrue( + "Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")", + behind >= indices * shards * 10L + ); // remove same key: old and new should have equal indices again checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 0ee3e66e739..363392e4ad6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -207,7 +207,6 @@ public class TransportGetTransformStatsAction extends TransportTasksAction transformsWithoutTasks = new HashSet<>(request.getExpandedIds()); transformsWithoutTasks.removeAll(response.getTransformsStats().stream().map(TransformStats::getId).collect(Collectors.toList())); @@ -252,7 +251,7 @@ public class TransportGetTransformStatsAction extends TransportTasksAction { + ActionListener.wrap(infoBuilder -> listener.onResponse(infoBuilder.build()), e -> { logger.warn("Failed to retrieve checkpointing info for transform [" + transform.getId() + "]", e); listener.onResponse(TransformCheckpointingInfo.EMPTY); }) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 583904044e8..930ecf152af 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -233,7 +233,8 @@ public class TransportPreviewTransformAction extends builder.startObject(); builder.field("docs", results); builder.endObject(); - SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); + SimulatePipelineRequest pipelineRequest = + new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); pipelineRequest.setId(pipeline); ClientHelper.executeAsyncWithOrigin( client, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointProvider.java index 5fca07cef2b..d8b084cbd86 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointProvider.java @@ -7,9 +7,9 @@ package org.elasticsearch.xpack.transform.checkpoint; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; -import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; /** @@ -44,11 +44,13 @@ public interface CheckpointProvider { * @param nextCheckpointProgress progress for the next checkpoint * @param listener listener to retrieve the result */ - void getCheckpointingInfo(TransformCheckpoint lastCheckpoint, - TransformCheckpoint nextCheckpoint, - TransformIndexerPosition nextCheckpointPosition, - TransformProgress nextCheckpointProgress, - ActionListener listener); + void getCheckpointingInfo( + TransformCheckpoint lastCheckpoint, + TransformCheckpoint nextCheckpoint, + TransformIndexerPosition nextCheckpointPosition, + TransformProgress nextCheckpointProgress, + ActionListener listener + ); /** * Get checkpoint statistics for a stopped data frame @@ -60,8 +62,10 @@ public interface CheckpointProvider { * @param nextCheckpointProgress progress for the next checkpoint * @param listener listener to retrieve the result */ - void getCheckpointingInfo(long lastCheckpointNumber, - TransformIndexerPosition nextCheckpointPosition, - TransformProgress nextCheckpointProgress, - ActionListener listener); + void getCheckpointingInfo( + long lastCheckpointNumber, + TransformIndexerPosition nextCheckpointPosition, + TransformProgress nextCheckpointProgress, + ActionListener listener + ); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 8abb6ba159a..a7bda6886fd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -21,14 +21,15 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; -import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -41,78 +42,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider { // threshold when to audit concrete index names, above this threshold we only report the number of changes private static final int AUDIT_CONCRETED_SOURCE_INDEX_CHANGES = 10; - /** - * Builder for collecting checkpointing information for the purpose of _stats - */ - private static class TransformCheckpointingInfoBuilder { - private TransformIndexerPosition nextCheckpointPosition; - private TransformProgress nextCheckpointProgress; - private TransformCheckpoint lastCheckpoint; - private TransformCheckpoint nextCheckpoint; - private TransformCheckpoint sourceCheckpoint; - - TransformCheckpointingInfoBuilder() {} - - TransformCheckpointingInfo build() { - if (lastCheckpoint == null) { - lastCheckpoint = TransformCheckpoint.EMPTY; - } - if (nextCheckpoint == null) { - nextCheckpoint = TransformCheckpoint.EMPTY; - } - if (sourceCheckpoint == null) { - sourceCheckpoint = TransformCheckpoint.EMPTY; - } - - // checkpointstats requires a non-negative checkpoint number - long lastCheckpointNumber = lastCheckpoint.getCheckpoint() > 0 ? lastCheckpoint.getCheckpoint() : 0; - long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0; - - return new TransformCheckpointingInfo( - new TransformCheckpointStats( - lastCheckpointNumber, - null, - null, - lastCheckpoint.getTimestamp(), - lastCheckpoint.getTimeUpperBound() - ), - new TransformCheckpointStats( - nextCheckpointNumber, - nextCheckpointPosition, - nextCheckpointProgress, - nextCheckpoint.getTimestamp(), - nextCheckpoint.getTimeUpperBound() - ), - TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint) - ); - } - - public TransformCheckpointingInfoBuilder setLastCheckpoint(TransformCheckpoint lastCheckpoint) { - this.lastCheckpoint = lastCheckpoint; - return this; - } - - public TransformCheckpointingInfoBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoint) { - this.nextCheckpoint = nextCheckpoint; - return this; - } - - public TransformCheckpointingInfoBuilder setSourceCheckpoint(TransformCheckpoint sourceCheckpoint) { - this.sourceCheckpoint = sourceCheckpoint; - return this; - } - - public TransformCheckpointingInfoBuilder setNextCheckpointProgress(TransformProgress nextCheckpointProgress) { - this.nextCheckpointProgress = nextCheckpointProgress; - return this; - } - - public TransformCheckpointingInfoBuilder setNextCheckpointPosition(TransformIndexerPosition nextCheckpointPosition) { - this.nextCheckpointPosition = nextCheckpointPosition; - return this; - } - } - private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class); protected final Client client; @@ -250,10 +179,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider { TransformCheckpoint nextCheckpoint, TransformIndexerPosition nextCheckpointPosition, TransformProgress nextCheckpointProgress, - ActionListener listener + ActionListener listener ) { - - TransformCheckpointingInfoBuilder checkpointingInfoBuilder = new TransformCheckpointingInfoBuilder(); + TransformCheckpointingInfo.TransformCheckpointingInfoBuilder checkpointingInfoBuilder = + new TransformCheckpointingInfo.TransformCheckpointingInfoBuilder(); checkpointingInfoBuilder.setLastCheckpoint(lastCheckpoint) .setNextCheckpoint(nextCheckpoint) @@ -263,10 +192,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider { long timestamp = System.currentTimeMillis(); getIndexCheckpoints(ActionListener.wrap(checkpointsByIndex -> { - checkpointingInfoBuilder.setSourceCheckpoint( - new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L) - ); - listener.onResponse(checkpointingInfoBuilder.build()); + TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L); + checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint); + checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint)); + listener.onResponse(checkpointingInfoBuilder); }, listener::onFailure)); } @@ -275,21 +204,24 @@ public class DefaultCheckpointProvider implements CheckpointProvider { long lastCheckpointNumber, TransformIndexerPosition nextCheckpointPosition, TransformProgress nextCheckpointProgress, - ActionListener listener + ActionListener listener ) { - TransformCheckpointingInfoBuilder checkpointingInfoBuilder = new TransformCheckpointingInfoBuilder(); + TransformCheckpointingInfo.TransformCheckpointingInfoBuilder checkpointingInfoBuilder = + new TransformCheckpointingInfo.TransformCheckpointingInfoBuilder(); checkpointingInfoBuilder.setNextCheckpointPosition(nextCheckpointPosition).setNextCheckpointProgress(nextCheckpointProgress); - + checkpointingInfoBuilder.setLastCheckpoint(TransformCheckpoint.EMPTY); long timestamp = System.currentTimeMillis(); // <3> got the source checkpoint, notify the user ActionListener> checkpointsByIndexListener = ActionListener.wrap(checkpointsByIndex -> { - checkpointingInfoBuilder.setSourceCheckpoint( - new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L) + TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L); + checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint); + checkpointingInfoBuilder.setOperationsBehind( + TransformCheckpoint.getBehind(checkpointingInfoBuilder.getLastCheckpoint(), sourceCheckpoint) ); - listener.onResponse(checkpointingInfoBuilder.build()); + listener.onResponse(checkpointingInfoBuilder); }, e -> { logger.debug( (Supplier) () -> new ParameterizedMessage( @@ -320,7 +252,8 @@ public class DefaultCheckpointProvider implements CheckpointProvider { // <1> got last checkpoint, get the next checkpoint ActionListener lastCheckpointListener = ActionListener.wrap(lastCheckpointObj -> { - checkpointingInfoBuilder.lastCheckpoint = lastCheckpointObj; + checkpointingInfoBuilder.setChangesLastDetectedAt(Instant.ofEpochMilli(lastCheckpointObj.getTimestamp())); + checkpointingInfoBuilder.setLastCheckpoint(lastCheckpointObj); transformConfigManager.getTransformCheckpoint(transformConfig.getId(), lastCheckpointNumber + 1, nextCheckpointListener); }, e -> { logger.debug( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java index b72ab2e2be8..64faf414625 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java @@ -11,7 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; -import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; @@ -66,7 +66,7 @@ public class TransformCheckpointService { final long lastCheckpointNumber, final TransformIndexerPosition nextCheckpointPosition, final TransformProgress nextCheckpointProgress, - final ActionListener listener + final ActionListener listener ) { // we need to retrieve the config first before we can defer the rest to the corresponding provider diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 48658e7b5ed..d977d6b99f9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -159,9 +160,22 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE TransformCheckpointService transformsCheckpointService, ActionListener listener ) { + ActionListener checkPointInfoListener = ActionListener.wrap(infoBuilder -> { + if (context.getChangesLastDetectedAt() != null) { + infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); + } + listener.onResponse(infoBuilder.build()); + }, listener::onFailure); + ClientTransformIndexer indexer = getIndexer(); if (indexer == null) { - transformsCheckpointService.getCheckpointingInfo(transform.getId(), context.getCheckpoint(), initialPosition, null, listener); + transformsCheckpointService.getCheckpointingInfo( + transform.getId(), + context.getCheckpoint(), + initialPosition, + null, + checkPointInfoListener + ); return; } indexer.getCheckpointProvider() @@ -170,13 +184,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE indexer.getNextCheckpoint(), indexer.getPosition(), indexer.getProgress(), - ActionListener.wrap(info -> { - if (context.getChangesLastDetectedAt() == null) { - listener.onResponse(info); - } else { - listener.onResponse(info.setChangesLastDetectedAt(context.getChangesLastDetectedAt())); - } - }, listener::onFailure) + checkPointInfoListener ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 5d36b9b164d..7af774f4a6f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -39,12 +39,13 @@ import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositionTests; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositionTests; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformProgressTests; import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; @@ -54,6 +55,7 @@ import org.junit.AfterClass; import org.junit.Before; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -73,7 +75,7 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest private static MockClientForCheckpointing mockClientForCheckpointing = null; private IndexBasedTransformConfigManager transformsConfigManager; - private TransformCheckpointService transformsCheckpointService; + private TransformCheckpointService transformCheckpointService; private class MockClientForCheckpointing extends NoOpClient { @@ -136,7 +138,7 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest // use a mock for the checkpoint service TransformAuditor mockAuditor = mock(TransformAuditor.class); - transformsCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor); + transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor); } @AfterClass @@ -255,11 +257,12 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest TransformCheckpointingInfo checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), - 30L + 30L, + Instant.ofEpochMilli(timestamp) ); assertAsync( - listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), + listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), checkpointInfo, null, null @@ -269,10 +272,11 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), - 63L + 63L, + Instant.ofEpochMilli(timestamp) ); assertAsync( - listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), + listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), checkpointInfo, null, null @@ -283,10 +287,11 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), - 0L + 0L, + Instant.ofEpochMilli(timestamp) ); assertAsync( - listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), + listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), checkpointInfo, null, null @@ -343,4 +348,25 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest return shardStats.toArray(new ShardStats[0]); } + private static void getCheckpoint( + TransformCheckpointService transformCheckpointService, + String transformId, + long lastCheckpointNumber, + TransformIndexerPosition nextCheckpointPosition, + TransformProgress nextCheckpointProgress, + ActionListener listener + ) { + ActionListener checkPointInfoListener = ActionListener.wrap( + infoBuilder -> { listener.onResponse(infoBuilder.build()); }, + listener::onFailure + ); + transformCheckpointService.getCheckpointingInfo( + transformId, + lastCheckpointNumber, + nextCheckpointPosition, + nextCheckpointProgress, + checkPointInfoListener + ); + } + }