[Transform] improve checkpoint reporting (#50369)

fixes empty checkpoints, re-factors checkpoint info creation (moves builder) and always reports
last change detection

relates #43201
relates #50018
This commit is contained in:
Hendrik Muhs 2019-12-20 08:28:16 +01:00
parent de14092ad2
commit 7c10e9b0e7
10 changed files with 326 additions and 210 deletions

View File

@ -62,8 +62,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
private final long timeUpperBoundMillis;
private static ConstructingObjectParser<TransformCheckpoint, Void> createParser(boolean lenient) {
ConstructingObjectParser<TransformCheckpoint, Void> parser = new ConstructingObjectParser<>(NAME,
lenient, args -> {
ConstructingObjectParser<TransformCheckpoint, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
String id = (String) args[0];
long timestamp = (Long) args[1];
long checkpoint = (Long) args[2];
@ -108,8 +107,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
return parser;
}
public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map<String, long[]> checkpoints,
Long timeUpperBound) {
public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map<String, long[]> checkpoints, Long timeUpperBound) {
this.transformId = Objects.requireNonNull(transformId);
this.timestampMillis = timestamp;
this.checkpoint = checkpoint;
@ -126,7 +124,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
}
public boolean isEmpty() {
return indicesCheckpoints.isEmpty();
return this.equals(EMPTY);
}
/**
@ -212,8 +210,10 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
final TransformCheckpoint that = (TransformCheckpoint) other;
// compare the timestamp, id, checkpoint and than call matches for the rest
return this.timestampMillis == that.timestampMillis && this.checkpoint == that.checkpoint
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis && matches(that);
return this.timestampMillis == that.timestampMillis
&& this.checkpoint == that.checkpoint
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis
&& matches(that);
}
/**
@ -315,8 +315,11 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
if (e.getValue() instanceof long[]) {
checkpoints.put(e.getKey(), (long[]) e.getValue());
} else {
throw new ElasticsearchParseException("expecting the checkpoints for [{}] to be a long[], but found [{}] instead",
e.getKey(), e.getValue().getClass());
throw new ElasticsearchParseException(
"expecting the checkpoints for [{}] to be a long[], but found [{}] instead",
e.getKey(),
e.getValue().getClass()
);
}
}
return checkpoints;

View File

@ -31,11 +31,110 @@ import java.util.Objects;
*/
public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
/**
* Builder for collecting checkpointing information for the purpose of _stats
*/
public static class TransformCheckpointingInfoBuilder {
private TransformIndexerPosition nextCheckpointPosition;
private TransformProgress nextCheckpointProgress;
private TransformCheckpoint lastCheckpoint;
private TransformCheckpoint nextCheckpoint;
private TransformCheckpoint sourceCheckpoint;
private Instant changesLastDetectedAt;
private long operationsBehind;
public TransformCheckpointingInfoBuilder() {}
public TransformCheckpointingInfo build() {
if (lastCheckpoint == null) {
lastCheckpoint = TransformCheckpoint.EMPTY;
}
if (nextCheckpoint == null) {
nextCheckpoint = TransformCheckpoint.EMPTY;
}
if (sourceCheckpoint == null) {
sourceCheckpoint = TransformCheckpoint.EMPTY;
}
// checkpointstats requires a non-negative checkpoint number
long lastCheckpointNumber = lastCheckpoint.getCheckpoint() > 0 ? lastCheckpoint.getCheckpoint() : 0;
long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0;
return new TransformCheckpointingInfo(
new TransformCheckpointStats(
lastCheckpointNumber,
null,
null,
lastCheckpoint.getTimestamp(),
lastCheckpoint.getTimeUpperBound()
),
new TransformCheckpointStats(
nextCheckpointNumber,
nextCheckpointPosition,
nextCheckpointProgress,
nextCheckpoint.getTimestamp(),
nextCheckpoint.getTimeUpperBound()
),
operationsBehind,
changesLastDetectedAt
);
}
public TransformCheckpointingInfoBuilder setLastCheckpoint(TransformCheckpoint lastCheckpoint) {
this.lastCheckpoint = lastCheckpoint;
return this;
}
public TransformCheckpoint getLastCheckpoint() {
return lastCheckpoint;
}
public TransformCheckpointingInfoBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoint) {
this.nextCheckpoint = nextCheckpoint;
return this;
}
public TransformCheckpoint getNextCheckpoint() {
return nextCheckpoint;
}
public TransformCheckpointingInfoBuilder setSourceCheckpoint(TransformCheckpoint sourceCheckpoint) {
this.sourceCheckpoint = sourceCheckpoint;
return this;
}
public TransformCheckpoint getSourceCheckpoint() {
return sourceCheckpoint;
}
public TransformCheckpointingInfoBuilder setNextCheckpointProgress(TransformProgress nextCheckpointProgress) {
this.nextCheckpointProgress = nextCheckpointProgress;
return this;
}
public TransformCheckpointingInfoBuilder setNextCheckpointPosition(TransformIndexerPosition nextCheckpointPosition) {
this.nextCheckpointPosition = nextCheckpointPosition;
return this;
}
public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant changesLastDetectedAt) {
this.changesLastDetectedAt = changesLastDetectedAt;
return this;
}
public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) {
this.operationsBehind = operationsBehind;
return this;
}
}
public static final TransformCheckpointingInfo EMPTY = new TransformCheckpointingInfo(
TransformCheckpointStats.EMPTY,
TransformCheckpointStats.EMPTY,
0L,
null);
null
);
public static final ParseField LAST_CHECKPOINT = new ParseField("last");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
@ -44,10 +143,9 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
private final TransformCheckpointStats last;
private final TransformCheckpointStats next;
private final long operationsBehind;
private Instant changesLastDetectedAt;
private final Instant changesLastDetectedAt;
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info",
true,
a -> {
@ -57,19 +155,29 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
behind,
changesLastDetectedAt);
});
changesLastDetectedAt
);
}
);
static {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply, LAST_CHECKPOINT);
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply, NEXT_CHECKPOINT);
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply,
LAST_CHECKPOINT
);
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply,
NEXT_CHECKPOINT
);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE);
ObjectParser.ValueType.VALUE
);
}
/**
@ -81,28 +189,26 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
* @param operationsBehind counter of operations the current checkpoint is behind source
* @param changesLastDetectedAt the last time the source indices were checked for changes
*/
public TransformCheckpointingInfo(TransformCheckpointStats last,
public TransformCheckpointingInfo(
TransformCheckpointStats last,
TransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt) {
Instant changesLastDetectedAt
) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli());
}
public TransformCheckpointingInfo(TransformCheckpointStats last,
TransformCheckpointStats next,
long operationsBehind) {
this(last, next, operationsBehind, null);
}
public TransformCheckpointingInfo(StreamInput in) throws IOException {
last = new TransformCheckpointStats(in);
next = new TransformCheckpointStats(in);
operationsBehind = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
changesLastDetectedAt = in.readOptionalInstant();
} else {
changesLastDetectedAt = null;
}
}
@ -122,11 +228,6 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
return changesLastDetectedAt;
}
public TransformCheckpointingInfo setChangesLastDetectedAt(Instant changesLastDetectedAt) {
this.changesLastDetectedAt = Instant.ofEpochMilli(Objects.requireNonNull(changesLastDetectedAt).toEpochMilli());
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -134,11 +235,15 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
if (next.getCheckpoint() > 0) {
builder.field(NEXT_CHECKPOINT.getPreferredName(), next);
}
if (operationsBehind > 0) {
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
}
if (changesLastDetectedAt != null) {
builder.timeField(CHANGES_LAST_DETECTED_AT.getPreferredName(),
builder.timeField(
CHANGES_LAST_DETECTED_AT.getPreferredName(),
CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string",
changesLastDetectedAt.toEpochMilli());
changesLastDetectedAt.toEpochMilli()
);
}
builder.endObject();
return builder;
@ -175,10 +280,10 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
TransformCheckpointingInfo that = (TransformCheckpointingInfo) other;
return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind &&
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
return Objects.equals(this.last, that.last)
&& Objects.equals(this.next, that.next)
&& this.operationsBehind == that.operationsBehind
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
}
@Override

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -23,8 +24,13 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern;
public class TransformCheckpointTests extends AbstractSerializingTransformTestCase<TransformCheckpoint> {
public static TransformCheckpoint randomTransformCheckpoints() {
return new TransformCheckpoint(randomAlphaOfLengthBetween(1, 10), randomNonNegativeLong(), randomNonNegativeLong(),
randomCheckpointsByIndex(), randomNonNegativeLong());
return new TransformCheckpoint(
randomAlphaOfLengthBetween(1, 10),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomCheckpointsByIndex(),
randomNonNegativeLong()
);
}
@Override
@ -62,8 +68,13 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
otherCheckpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), new long[] { 1, 2, 3 });
long timeUpperBound = randomNonNegativeLong();
TransformCheckpoint dataFrameTransformCheckpoints = new TransformCheckpoint(id, timestamp, checkpoint,
checkpointsByIndex, timeUpperBound);
TransformCheckpoint dataFrameTransformCheckpoints = new TransformCheckpoint(
id,
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound
);
// same
assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpoints));
@ -74,20 +85,40 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
assertTrue(dataFrameTransformCheckpointsCopy.matches(dataFrameTransformCheckpoints));
// other id
assertFalse(dataFrameTransformCheckpoints
.matches(new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound)));
assertFalse(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound)
)
);
// other timestamp
assertTrue(dataFrameTransformCheckpoints
.matches(new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound)));
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound)
)
);
// other checkpoint
assertTrue(dataFrameTransformCheckpoints
.matches(new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound)));
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound)
)
);
// other index checkpoints
assertFalse(dataFrameTransformCheckpoints
.matches(new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound)));
assertFalse(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound)
)
);
// other time upper bound
assertTrue(dataFrameTransformCheckpoints
.matches(new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)));
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)
)
);
}
public void testEmpty() {
assertTrue(TransformCheckpoint.EMPTY.isEmpty());
assertFalse(new TransformCheckpoint("some_id", 0L, -1, Collections.emptyMap(), 0L).isEmpty());
}
public void testGetBehind() {
@ -119,14 +150,16 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
long checkpoint = randomLongBetween(10, 100);
TransformCheckpoint checkpointOld = new TransformCheckpoint(
id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
TransformCheckpoint checkpointTransientNew = new TransformCheckpoint(
id, timestamp, -1L, checkpointsByIndexNew, 0L);
TransformCheckpoint checkpointNew = new TransformCheckpoint(
id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);
TransformCheckpoint checkpointOld = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
TransformCheckpoint checkpointTransientNew = new TransformCheckpoint(id, timestamp, -1L, checkpointsByIndexNew, 0L);
TransformCheckpoint checkpointNew = new TransformCheckpoint(id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);
TransformCheckpoint checkpointOlderButNewerShardsCheckpoint = new TransformCheckpoint(
id, timestamp, checkpoint - 1, checkpointsByIndexNew, 0L);
id,
timestamp,
checkpoint - 1,
checkpointsByIndexNew,
0L
);
assertEquals(indices * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
assertEquals(indices * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointNew));
@ -140,8 +173,10 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
assertEquals(0L, TransformCheckpoint.getBehind(checkpointNew, checkpointTransientNew));
// transient new vs new: illegal
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew));
Exception e = expectThrows(
IllegalArgumentException.class,
() -> TransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew)
);
assertEquals("can not compare transient against a non transient checkpoint", e.getMessage());
// new vs old: illegal
@ -155,8 +190,10 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
// remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty
checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey());
long behind = TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew);
assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")",
behind >= indices * shards * 10L);
assertTrue(
"Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")",
behind >= indices * shards * 10L
);
// remove same key: old and new should have equal indices again
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());

View File

@ -207,7 +207,6 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
listener.onResponse(response);
return;
}
Set<String> transformsWithoutTasks = new HashSet<>(request.getExpandedIds());
transformsWithoutTasks.removeAll(response.getTransformsStats().stream().map(TransformStats::getId).collect(Collectors.toList()));
@ -252,7 +251,7 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
transform.getTransformState().getCheckpoint(),
transform.getTransformState().getPosition(),
transform.getTransformState().getProgress(),
ActionListener.wrap(listener::onResponse, e -> {
ActionListener.wrap(infoBuilder -> listener.onResponse(infoBuilder.build()), e -> {
logger.warn("Failed to retrieve checkpointing info for transform [" + transform.getId() + "]", e);
listener.onResponse(TransformCheckpointingInfo.EMPTY);
})

View File

@ -233,7 +233,8 @@ public class TransportPreviewTransformAction extends
builder.startObject();
builder.field("docs", results);
builder.endObject();
SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
SimulatePipelineRequest pipelineRequest =
new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
pipelineRequest.setId(pipeline);
ClientHelper.executeAsyncWithOrigin(
client,

View File

@ -7,9 +7,9 @@
package org.elasticsearch.xpack.transform.checkpoint;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
/**
@ -44,11 +44,13 @@ public interface CheckpointProvider {
* @param nextCheckpointProgress progress for the next checkpoint
* @param listener listener to retrieve the result
*/
void getCheckpointingInfo(TransformCheckpoint lastCheckpoint,
void getCheckpointingInfo(
TransformCheckpoint lastCheckpoint,
TransformCheckpoint nextCheckpoint,
TransformIndexerPosition nextCheckpointPosition,
TransformProgress nextCheckpointProgress,
ActionListener<TransformCheckpointingInfo> listener);
ActionListener<TransformCheckpointingInfoBuilder> listener
);
/**
* Get checkpoint statistics for a stopped data frame
@ -60,8 +62,10 @@ public interface CheckpointProvider {
* @param nextCheckpointProgress progress for the next checkpoint
* @param listener listener to retrieve the result
*/
void getCheckpointingInfo(long lastCheckpointNumber,
void getCheckpointingInfo(
long lastCheckpointNumber,
TransformIndexerPosition nextCheckpointPosition,
TransformProgress nextCheckpointProgress,
ActionListener<TransformCheckpointingInfo> listener);
ActionListener<TransformCheckpointingInfoBuilder> listener
);
}

View File

@ -21,14 +21,15 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -41,78 +42,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
// threshold when to audit concrete index names, above this threshold we only report the number of changes
private static final int AUDIT_CONCRETED_SOURCE_INDEX_CHANGES = 10;
/**
* Builder for collecting checkpointing information for the purpose of _stats
*/
private static class TransformCheckpointingInfoBuilder {
private TransformIndexerPosition nextCheckpointPosition;
private TransformProgress nextCheckpointProgress;
private TransformCheckpoint lastCheckpoint;
private TransformCheckpoint nextCheckpoint;
private TransformCheckpoint sourceCheckpoint;
TransformCheckpointingInfoBuilder() {}
TransformCheckpointingInfo build() {
if (lastCheckpoint == null) {
lastCheckpoint = TransformCheckpoint.EMPTY;
}
if (nextCheckpoint == null) {
nextCheckpoint = TransformCheckpoint.EMPTY;
}
if (sourceCheckpoint == null) {
sourceCheckpoint = TransformCheckpoint.EMPTY;
}
// checkpointstats requires a non-negative checkpoint number
long lastCheckpointNumber = lastCheckpoint.getCheckpoint() > 0 ? lastCheckpoint.getCheckpoint() : 0;
long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0;
return new TransformCheckpointingInfo(
new TransformCheckpointStats(
lastCheckpointNumber,
null,
null,
lastCheckpoint.getTimestamp(),
lastCheckpoint.getTimeUpperBound()
),
new TransformCheckpointStats(
nextCheckpointNumber,
nextCheckpointPosition,
nextCheckpointProgress,
nextCheckpoint.getTimestamp(),
nextCheckpoint.getTimeUpperBound()
),
TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint)
);
}
public TransformCheckpointingInfoBuilder setLastCheckpoint(TransformCheckpoint lastCheckpoint) {
this.lastCheckpoint = lastCheckpoint;
return this;
}
public TransformCheckpointingInfoBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoint) {
this.nextCheckpoint = nextCheckpoint;
return this;
}
public TransformCheckpointingInfoBuilder setSourceCheckpoint(TransformCheckpoint sourceCheckpoint) {
this.sourceCheckpoint = sourceCheckpoint;
return this;
}
public TransformCheckpointingInfoBuilder setNextCheckpointProgress(TransformProgress nextCheckpointProgress) {
this.nextCheckpointProgress = nextCheckpointProgress;
return this;
}
public TransformCheckpointingInfoBuilder setNextCheckpointPosition(TransformIndexerPosition nextCheckpointPosition) {
this.nextCheckpointPosition = nextCheckpointPosition;
return this;
}
}
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);
protected final Client client;
@ -250,10 +179,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
TransformCheckpoint nextCheckpoint,
TransformIndexerPosition nextCheckpointPosition,
TransformProgress nextCheckpointProgress,
ActionListener<TransformCheckpointingInfo> listener
ActionListener<TransformCheckpointingInfoBuilder> listener
) {
TransformCheckpointingInfoBuilder checkpointingInfoBuilder = new TransformCheckpointingInfoBuilder();
TransformCheckpointingInfo.TransformCheckpointingInfoBuilder checkpointingInfoBuilder =
new TransformCheckpointingInfo.TransformCheckpointingInfoBuilder();
checkpointingInfoBuilder.setLastCheckpoint(lastCheckpoint)
.setNextCheckpoint(nextCheckpoint)
@ -263,10 +192,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
long timestamp = System.currentTimeMillis();
getIndexCheckpoints(ActionListener.wrap(checkpointsByIndex -> {
checkpointingInfoBuilder.setSourceCheckpoint(
new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L)
);
listener.onResponse(checkpointingInfoBuilder.build());
TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L);
checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint);
checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint));
listener.onResponse(checkpointingInfoBuilder);
}, listener::onFailure));
}
@ -275,21 +204,24 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
long lastCheckpointNumber,
TransformIndexerPosition nextCheckpointPosition,
TransformProgress nextCheckpointProgress,
ActionListener<TransformCheckpointingInfo> listener
ActionListener<TransformCheckpointingInfoBuilder> listener
) {
TransformCheckpointingInfoBuilder checkpointingInfoBuilder = new TransformCheckpointingInfoBuilder();
TransformCheckpointingInfo.TransformCheckpointingInfoBuilder checkpointingInfoBuilder =
new TransformCheckpointingInfo.TransformCheckpointingInfoBuilder();
checkpointingInfoBuilder.setNextCheckpointPosition(nextCheckpointPosition).setNextCheckpointProgress(nextCheckpointProgress);
checkpointingInfoBuilder.setLastCheckpoint(TransformCheckpoint.EMPTY);
long timestamp = System.currentTimeMillis();
// <3> got the source checkpoint, notify the user
ActionListener<Map<String, long[]>> checkpointsByIndexListener = ActionListener.wrap(checkpointsByIndex -> {
checkpointingInfoBuilder.setSourceCheckpoint(
new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L)
TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L);
checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint);
checkpointingInfoBuilder.setOperationsBehind(
TransformCheckpoint.getBehind(checkpointingInfoBuilder.getLastCheckpoint(), sourceCheckpoint)
);
listener.onResponse(checkpointingInfoBuilder.build());
listener.onResponse(checkpointingInfoBuilder);
}, e -> {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
@ -320,7 +252,8 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
// <1> got last checkpoint, get the next checkpoint
ActionListener<TransformCheckpoint> lastCheckpointListener = ActionListener.wrap(lastCheckpointObj -> {
checkpointingInfoBuilder.lastCheckpoint = lastCheckpointObj;
checkpointingInfoBuilder.setChangesLastDetectedAt(Instant.ofEpochMilli(lastCheckpointObj.getTimestamp()));
checkpointingInfoBuilder.setLastCheckpoint(lastCheckpointObj);
transformConfigManager.getTransformCheckpoint(transformConfig.getId(), lastCheckpointNumber + 1, nextCheckpointListener);
}, e -> {
logger.debug(

View File

@ -11,7 +11,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
@ -66,7 +66,7 @@ public class TransformCheckpointService {
final long lastCheckpointNumber,
final TransformIndexerPosition nextCheckpointPosition,
final TransformProgress nextCheckpointProgress,
final ActionListener<TransformCheckpointingInfo> listener
final ActionListener<TransformCheckpointingInfoBuilder> listener
) {
// we need to retrieve the config first before we can defer the rest to the corresponding provider

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@ -159,9 +160,22 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
TransformCheckpointService transformsCheckpointService,
ActionListener<TransformCheckpointingInfo> listener
) {
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ActionListener.wrap(infoBuilder -> {
if (context.getChangesLastDetectedAt() != null) {
infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt());
}
listener.onResponse(infoBuilder.build());
}, listener::onFailure);
ClientTransformIndexer indexer = getIndexer();
if (indexer == null) {
transformsCheckpointService.getCheckpointingInfo(transform.getId(), context.getCheckpoint(), initialPosition, null, listener);
transformsCheckpointService.getCheckpointingInfo(
transform.getId(),
context.getCheckpoint(),
initialPosition,
null,
checkPointInfoListener
);
return;
}
indexer.getCheckpointProvider()
@ -170,13 +184,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
indexer.getNextCheckpoint(),
indexer.getPosition(),
indexer.getProgress(),
ActionListener.wrap(info -> {
if (context.getChangesLastDetectedAt() == null) {
listener.onResponse(info);
} else {
listener.onResponse(info.setChangesLastDetectedAt(context.getChangesLastDetectedAt()));
}
}, listener::onFailure)
checkPointInfoListener
);
}

View File

@ -39,12 +39,13 @@ import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositionTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositionTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgressTests;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
@ -54,6 +55,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -73,7 +75,7 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
private static MockClientForCheckpointing mockClientForCheckpointing = null;
private IndexBasedTransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformCheckpointService transformCheckpointService;
private class MockClientForCheckpointing extends NoOpClient {
@ -136,7 +138,7 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
// use a mock for the checkpoint service
TransformAuditor mockAuditor = mock(TransformAuditor.class);
transformsCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor);
transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor);
}
@AfterClass
@ -255,11 +257,12 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
TransformCheckpointingInfo checkpointInfo = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, timestamp, 0L),
new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
30L
30L,
Instant.ofEpochMilli(timestamp)
);
assertAsync(
listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener),
checkpointInfo,
null,
null
@ -269,10 +272,11 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
checkpointInfo = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, timestamp, 0L),
new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
63L
63L,
Instant.ofEpochMilli(timestamp)
);
assertAsync(
listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener),
checkpointInfo,
null,
null
@ -283,10 +287,11 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
checkpointInfo = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, timestamp, 0L),
new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
0L
0L,
Instant.ofEpochMilli(timestamp)
);
assertAsync(
listener -> transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener),
checkpointInfo,
null,
null
@ -343,4 +348,25 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
return shardStats.toArray(new ShardStats[0]);
}
private static void getCheckpoint(
TransformCheckpointService transformCheckpointService,
String transformId,
long lastCheckpointNumber,
TransformIndexerPosition nextCheckpointPosition,
TransformProgress nextCheckpointProgress,
ActionListener<TransformCheckpointingInfo> listener
) {
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ActionListener.wrap(
infoBuilder -> { listener.onResponse(infoBuilder.build()); },
listener::onFailure
);
transformCheckpointService.getCheckpointingInfo(
transformId,
lastCheckpointNumber,
nextCheckpointPosition,
nextCheckpointProgress,
checkPointInfoListener
);
}
}