[Transform] refactor naming leftovers and apply code formating (#51465) (#51470)

refactor renaming leftovers: "data frame transform" to "transforms", touch only internals (variable
names, non-public API's, doc strings, ...) and apply code-formatting (spotless). No logical changes.
This commit is contained in:
Hendrik Muhs 2020-01-27 14:04:57 +01:00 committed by GitHub
parent 2eeea21d84
commit b233e93014
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 649 additions and 484 deletions

View File

@ -12,19 +12,17 @@ import java.util.Locale;
public class TransformMessages {
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT =
"Timed out after [{0}] while waiting for transform [{1}] to stop";
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT =
"Interrupted while waiting for transform [{0}] to stop";
"Timed out after [{0}] while waiting for transform [{1}] to stop";
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for transform [{0}] to stop";
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION =
"Failed to validate configuration";
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION = "Failed to validate configuration";
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";
public static final String REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
public static final String REST_PUT_TRANSFORM_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
public static final String TRANSFORM_CONFIG_INVALID = "Transform configuration is invalid [{0}]";
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
@ -33,64 +31,49 @@ public class TransformMessages {
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
public static final String CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the transform.";
public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]."
+ " Use force stop to stop the transform.";
public static final String CANNOT_START_FAILED_TRANSFORM =
"Unable to start transform [{0}] as it is in a failed state with failure: [{1}]. " +
"Use force stop and then restart the transform once error is resolved.";
"Unable to start transform [{0}] as it is in a failed state with failure: [{1}]. "
+ "Use force stop and then restart the transform once error is resolved.";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
"Failed to reload transform configuration for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =
"Failed to parse transform configuration for transform [{0}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = "Failed to reload transform configuration for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION = "Failed to parse transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION =
"Failed to parse transform statistics for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CHECKPOINT =
"Failed to load transform checkpoint for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_STATE =
"Failed to load transform state for transform [{0}]";
public static final String TRANSFORM_CONFIGURATION_NO_TRANSFORM =
"Transform configuration must specify exactly 1 function";
public static final String TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =
"Pivot transform configuration must specify at least 1 group_by";
"Failed to parse transform statistics for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CHECKPOINT = "Failed to load transform checkpoint for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_STATE = "Failed to load transform state for transform [{0}]";
public static final String TRANSFORM_CONFIGURATION_NO_TRANSFORM = "Transform configuration must specify exactly 1 function";
public static final String TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY = "Pivot transform configuration must specify at least 1 group_by";
public static final String TRANSFORM_CONFIGURATION_PIVOT_NO_AGGREGATION =
"Pivot transform configuration must specify at least 1 aggregation";
"Pivot transform configuration must specify at least 1 aggregation";
public static final String TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION =
"Failed to create composite aggregation from pivot function";
public static final String TRANSFORM_CONFIGURATION_INVALID =
"Transform configuration [{0}] has invalid elements";
"Failed to create composite aggregation from pivot function";
public static final String TRANSFORM_CONFIGURATION_INVALID = "Transform configuration [{0}] has invalid elements";
public static final String UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
public static final String TRANSFORM_UPDATE_CANNOT_CHANGE_SYNC_METHOD =
"Cannot change the current sync configuration of transform [{0}] from [{1}] to [{2}]";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_QUERY =
"Failed to parse query for transform";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =
"Failed to parse group_by for pivot transform";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_AGGREGATION =
"Failed to parse aggregation for pivot transform";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_QUERY = "Failed to parse query for transform";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = "Failed to parse group_by for pivot transform";
public static final String LOG_TRANSFORM_CONFIGURATION_BAD_AGGREGATION = "Failed to parse aggregation for pivot transform";
public static final String LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE =
"Insufficient memory for search, reducing number of buckets per search from [{0}] to [{1}]";
"Insufficient memory for search, reducing number of buckets per search from [{0}] to [{1}]";
public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE =
"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, "
"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, "
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR = "Failed to execute script with error: [{0}], stack trace: {1}";
public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR =
"Failed to index documents into destination index due to permanent error: [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";
"Failed to index documents into destination index due to permanent error: [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]";
public static final String ID_TOO_LONG = "The id cannot contain more than {0} characters.";
public static final String INVALID_ID = "Invalid {0}; ''{1}'' can contain lowercase alphanumeric (a-z and 0-9), hyphens or " +
"underscores; must start and end with alphanumeric";
private TransformMessages() {
}
public static final String INVALID_ID = "Invalid {0}; ''{1}'' can contain lowercase alphanumeric (a-z and 0-9), hyphens or "
+ "underscores; must start and end with alphanumeric";
private TransformMessages() {}
/**
* Returns the message parameter

View File

@ -24,7 +24,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
/**
* Checkpoint stats data for 1 checkpoint
*
* This is the user-facing side of DataFrameTransformCheckpoint, containing only the stats to be exposed.
* This is the user-facing side of TransformCheckpoint, containing only the stats to be exposed.
*/
public class TransformCheckpointStats implements Writeable, ToXContentObject {
@ -37,15 +37,18 @@ public class TransformCheckpointStats implements Writeable, ToXContentObject {
private final long timeUpperBoundMillis;
static final ConstructingObjectParser<TransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long checkpoint = args[0] == null ? 0L : (Long) args[0];
TransformIndexerPosition position = (TransformIndexerPosition) args[1];
TransformProgress checkpointProgress = (TransformProgress) args[2];
long timestamp = args[3] == null ? 0L : (Long) args[3];
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];
"data_frame_transform_checkpoint_stats",
true,
args -> {
long checkpoint = args[0] == null ? 0L : (Long) args[0];
TransformIndexerPosition position = (TransformIndexerPosition) args[1];
TransformProgress checkpointProgress = (TransformProgress) args[2];
long timestamp = args[3] == null ? 0L : (Long) args[3];
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];
return new TransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
});
return new TransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
}
);
static {
LENIENT_PARSER.declareLong(optionalConstructorArg(), TransformField.CHECKPOINT);
@ -55,9 +58,13 @@ public class TransformCheckpointStats implements Writeable, ToXContentObject {
LENIENT_PARSER.declareLong(optionalConstructorArg(), TransformField.TIME_UPPER_BOUND_MILLIS);
}
public TransformCheckpointStats(final long checkpoint, final TransformIndexerPosition position,
final TransformProgress checkpointProgress, final long timestampMillis,
final long timeUpperBoundMillis) {
public TransformCheckpointStats(
final long checkpoint,
final TransformIndexerPosition position,
final TransformProgress checkpointProgress,
final long timestampMillis,
final long timeUpperBoundMillis
) {
this.checkpoint = checkpoint;
this.position = position;
this.checkpointProgress = checkpointProgress;
@ -118,12 +125,18 @@ public class TransformCheckpointStats implements Writeable, ToXContentObject {
builder.field(TransformField.CHECKPOINT_PROGRESS.getPreferredName(), checkpointProgress);
}
if (timestampMillis > 0) {
builder.timeField(TransformField.TIMESTAMP_MILLIS.getPreferredName(), TransformField.TIMESTAMP.getPreferredName(),
timestampMillis);
builder.timeField(
TransformField.TIMESTAMP_MILLIS.getPreferredName(),
TransformField.TIMESTAMP.getPreferredName(),
timestampMillis
);
}
if (timeUpperBoundMillis > 0) {
builder.timeField(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName(), TransformField.TIME_UPPER_BOUND.getPreferredName(),
timeUpperBoundMillis);
builder.timeField(
TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName(),
TransformField.TIME_UPPER_BOUND.getPreferredName(),
timeUpperBoundMillis
);
}
builder.endObject();
return builder;

View File

@ -54,23 +54,23 @@ public class TransformStats implements Writeable, ToXContentObject {
public static final ConstructingObjectParser<TransformStats, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
a -> new TransformStats((String) a[0],
a -> new TransformStats(
(String) a[0],
(State) a[1],
(String) a[2],
(NodeAttributes) a[3],
(TransformIndexerStats) a[4],
(TransformCheckpointingInfo) a[5]));
(TransformCheckpointingInfo) a[5]
)
);
static {
PARSER.declareString(constructorArg(), TransformField.ID);
PARSER.declareField(constructorArg(), p -> TransformStats.State.fromString(p.text()), STATE_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> TransformStats.State.fromString(p.text()), STATE_FIELD, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), REASON_FIELD);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT);
PARSER.declareObject(constructorArg(), (p, c) -> TransformIndexerStats.fromXContent(p),
TransformField.STATS_FIELD);
PARSER.declareObject(constructorArg(),
(p, c) -> TransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
PARSER.declareObject(constructorArg(), (p, c) -> TransformIndexerStats.fromXContent(p), TransformField.STATS_FIELD);
PARSER.declareObject(constructorArg(), (p, c) -> TransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static TransformStats fromXContent(XContentParser parser) throws IOException {
@ -82,18 +82,17 @@ public class TransformStats implements Writeable, ToXContentObject {
}
public static TransformStats stoppedStats(String id, TransformIndexerStats indexerTransformStats) {
return new TransformStats(id,
State.STOPPED,
null,
null,
indexerTransformStats,
TransformCheckpointingInfo.EMPTY);
return new TransformStats(id, State.STOPPED, null, null, indexerTransformStats, TransformCheckpointingInfo.EMPTY);
}
public TransformStats(String id, State state, @Nullable String reason,
@Nullable NodeAttributes node, TransformIndexerStats stats,
TransformCheckpointingInfo checkpointingInfo) {
public TransformStats(
String id,
State state,
@Nullable String reason,
@Nullable NodeAttributes node,
TransformIndexerStats stats,
TransformCheckpointingInfo checkpointingInfo
) {
this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state);
this.reason = reason;
@ -116,8 +115,8 @@ public class TransformStats implements Writeable, ToXContentObject {
this.checkpointingInfo = new TransformCheckpointingInfo(in);
} else {
// Prior to version 7.4 DataFrameTransformStats didn't exist, and we have
// to do the best we can of reading from a DataFrameTransformStoredDoc object
// Prior to version 7.4 TransformStats didn't exist, and we have
// to do the best we can of reading from a TransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
this.id = in.readString();
TransformState transformState = new TransformState(in);
@ -161,18 +160,20 @@ public class TransformStats implements Writeable, ToXContentObject {
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
} else {
// Prior to version 7.4 DataFrameTransformStats didn't exist, and we have
// to do the best we can of writing to a DataFrameTransformStoredDoc object
// Prior to version 7.4 TransformStats didn't exist, and we have
// to do the best we can of writing to a TransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
out.writeString(id);
Tuple<TransformTaskState, IndexerState> stateComponents = state.toComponents();
new TransformState(stateComponents.v1(),
new TransformState(
stateComponents.v1(),
stateComponents.v2(),
checkpointingInfo.getNext().getPosition(),
checkpointingInfo.getLast().getCheckpoint(),
reason,
checkpointingInfo.getNext().getCheckpointProgress(),
node).writeTo(out);
node
).writeTo(out);
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
}
@ -240,7 +241,12 @@ public class TransformStats implements Writeable, ToXContentObject {
public enum State implements Writeable {
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
STARTED,
INDEXING,
ABORTING,
STOPPING,
STOPPED,
FAILED;
public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
@ -259,8 +265,8 @@ public class TransformStats implements Writeable, ToXContentObject {
} else {
// If we get here then the task state must be started, and that means we should have an indexer state
assert(taskState == TransformTaskState.STARTED);
assert(indexerState != null);
assert (taskState == TransformTaskState.STARTED);
assert (indexerState != null);
switch (indexerState) {
case STARTED:
@ -301,7 +307,7 @@ public class TransformStats implements Writeable, ToXContentObject {
return new Tuple<>(TransformTaskState.STARTED, IndexerState.STOPPING);
case STOPPED:
// This one is not deterministic, because an overall state of STOPPED could arise
// from either (STOPPED, null) or (STARTED, STOPPED). However, (STARTED, STOPPED)
// from either (STOPPED, null) or (STARTED, STOPPED). However, (STARTED, STOPPED)
// is a very short-lived state so it's reasonable to assume the other, especially
// as this method is only for mixed version cluster compatibility.
return new Tuple<>(TransformTaskState.STOPPED, null);

View File

@ -28,9 +28,9 @@ public class GetTransformActionResponseTests extends AbstractWireSerializingTran
List<TransformConfig> transforms = new ArrayList<>();
transforms.add(TransformConfigTests.randomTransformConfig());
transforms.add(TransformConfigTests.randomInvalidDataFrameTransformConfig());
transforms.add(TransformConfigTests.randomInvalidTransformConfig());
transforms.add(TransformConfigTests.randomTransformConfig());
transforms.add(TransformConfigTests.randomInvalidDataFrameTransformConfig());
transforms.add(TransformConfigTests.randomInvalidTransformConfig());
Response r = new Response(transforms, transforms.size());
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
@ -58,13 +58,17 @@ public class GetTransformActionResponseTests extends AbstractWireSerializingTran
Map<String, Object> responseAsMap = createParser(builder).map();
@SuppressWarnings("unchecked")
List<Map<String, Object>> transformsResponse = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms",
responseAsMap);
List<Map<String, Object>> transformsResponse = (List<Map<String, Object>>) XContentMapValues.extractValue(
"transforms",
responseAsMap
);
assertEquals(transforms.size(), transformsResponse.size());
for (int i = 0; i < transforms.size(); ++i) {
assertArrayEquals(transforms.get(i).getSource().getIndex(),
((ArrayList<String>)XContentMapValues.extractValue("source.index", transformsResponse.get(i))).toArray(new String[0]));
assertArrayEquals(
transforms.get(i).getSource().getIndex(),
((ArrayList<String>) XContentMapValues.extractValue("source.index", transformsResponse.get(i))).toArray(new String[0])
);
assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i)));
}
}

View File

@ -23,7 +23,7 @@ public class GetTransformStatsActionResponseTests extends AbstractWireSerializin
List<TransformStats> stats = new ArrayList<>();
int totalStats = randomInt(10);
for (int i = 0; i < totalStats; ++i) {
stats.add(TransformStatsTests.randomDataFrameTransformStats());
stats.add(TransformStatsTests.randomTransformStats());
}
int totalErrors = randomInt(10);
List<TaskOperationFailure> taskFailures = new ArrayList<>(totalErrors);

View File

@ -38,7 +38,7 @@ public class PutTransformActionRequestTests extends AbstractWireSerializingTestC
@Override
protected Request createTestInstance() {
TransformConfig config = TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
TransformConfig config = TransformConfigTests.randomTransformConfigWithoutHeaders(transformId);
return new Request(config, randomBoolean());
}
@ -47,8 +47,9 @@ public class PutTransformActionRequestTests extends AbstractWireSerializingTestC
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(),
TimeSyncConfig::new));
namedWriteables.add(
new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME_BASED_SYNC.getPreferredName(), TimeSyncConfig::new)
);
return new NamedWriteableRegistry(namedWriteables);
}
}

View File

@ -23,12 +23,14 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
@Override
protected Request createTestInstance() {
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
Request request = new Request(randomAlphaOfLengthBetween(1, 10),
Request request = new Request(
randomAlphaOfLengthBetween(1, 10),
randomBoolean(),
randomBoolean(),
timeout,
randomBoolean(),
randomBoolean());
randomBoolean()
);
if (randomBoolean()) {
request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false))));
}
@ -50,28 +52,38 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint);
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint);
assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());
assertNotEquals(r1, r2);
assertNotEquals(r1.hashCode(), r2.hashCode());
}
public void testMatch() {
String dataFrameId = "dataframe-id";
String transformId = "transform-id";
Task dataFrameTask = new Task(1L, "persistent", "action",
TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
Task transformTask = new Task(
1L,
"persistent",
"action",
TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transformId,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
);
Request request = new Request("unrelated", false, false, null, false, false);
request.setExpandedIds(new HashSet<>(Arrays.asList("foo", "bar")));
assertFalse(request.match(dataFrameTask));
assertFalse(request.match(transformTask));
Request matchingRequest = new Request(dataFrameId, false, false, null, false, false);
matchingRequest.setExpandedIds(Collections.singleton(dataFrameId));
assertTrue(matchingRequest.match(dataFrameTask));
Request matchingRequest = new Request(transformId, false, false, null, false, false);
matchingRequest.setExpandedIds(Collections.singleton(transformId));
assertTrue(matchingRequest.match(transformTask));
Task notADataFrameTask = new Task(1L, "persistent", "action",
Task notATransformTask = new Task(
1L,
"persistent",
"action",
"some other task, say monitoring",
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
assertFalse(matchingRequest.match(notADataFrameTask));
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
);
assertFalse(matchingRequest.match(notATransformTask));
}
}

View File

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdateTests.randomDataFrameTransformConfigUpdate;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdateTests.randomTransformConfigUpdate;
public class UpdateTransformActionRequestTests extends AbstractWireSerializingTransformTestCase<Request> {
@ -20,7 +20,7 @@ public class UpdateTransformActionRequestTests extends AbstractWireSerializingTr
@Override
protected Request createTestInstance() {
return new Request(randomDataFrameTransformConfigUpdate(), randomAlphaOfLength(10), randomBoolean());
return new Request(randomTransformConfigUpdate(), randomAlphaOfLength(10), randomBoolean());
}
}

View File

@ -18,7 +18,7 @@ public class UpdateTransformsActionResponseTests extends AbstractSerializingTran
@Override
protected Response createTestInstance() {
return new Response(TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders());
return new Response(TransformConfigTests.randomTransformConfigWithoutHeaders());
}
@Override

View File

@ -11,13 +11,15 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public class TransformCheckpointStatsTests extends AbstractSerializingTransformTestCase<TransformCheckpointStats>
{
public static TransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new TransformCheckpointStats(randomLongBetween(1, 1_000_000),
public class TransformCheckpointStatsTests extends AbstractSerializingTransformTestCase<TransformCheckpointStats> {
public static TransformCheckpointStats randomTransformCheckpointStats() {
return new TransformCheckpointStats(
randomLongBetween(1, 1_000_000),
TransformIndexerPositionTests.randomTransformIndexerPosition(),
randomBoolean() ? null : TransformProgressTests.randomTransformProgress(),
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));
randomLongBetween(1, 1_000_000),
randomLongBetween(0, 1_000_000)
);
}
@Override
@ -27,7 +29,7 @@ public class TransformCheckpointStatsTests extends AbstractSerializingTransformT
@Override
protected TransformCheckpointStats createTestInstance() {
return randomDataFrameTransformCheckpointStats();
return randomTransformCheckpointStats();
}
@Override

View File

@ -49,10 +49,10 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
}
public void testXContentForInternalStorage() throws IOException {
TransformCheckpoint dataFrameTransformCheckpoints = randomTransformCheckpoints();
TransformCheckpoint transformCheckpoints = randomTransformCheckpoints();
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, getToXContentParams());
XContentBuilder content = transformCheckpoints.toXContent(xContentBuilder, getToXContentParams());
String doc = Strings.toString(content);
assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_checkpoint\".*"));
@ -68,51 +68,35 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
otherCheckpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), new long[] { 1, 2, 3 });
long timeUpperBound = randomNonNegativeLong();
TransformCheckpoint dataFrameTransformCheckpoints = new TransformCheckpoint(
id,
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound
);
TransformCheckpoint transformCheckpoints = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, timeUpperBound);
// same
assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpoints));
TransformCheckpoint dataFrameTransformCheckpointsCopy = copyInstance(dataFrameTransformCheckpoints);
assertTrue(transformCheckpoints.matches(transformCheckpoints));
TransformCheckpoint transformCheckpointsCopy = copyInstance(transformCheckpoints);
// with copy
assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpointsCopy));
assertTrue(dataFrameTransformCheckpointsCopy.matches(dataFrameTransformCheckpoints));
assertTrue(transformCheckpoints.matches(transformCheckpointsCopy));
assertTrue(transformCheckpointsCopy.matches(transformCheckpoints));
// other id
assertFalse(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound)
)
transformCheckpoints.matches(new TransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound))
);
// other timestamp
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound)
)
transformCheckpoints.matches(new TransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound))
);
// other checkpoint
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound)
)
transformCheckpoints.matches(new TransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound))
);
// other index checkpoints
assertFalse(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound)
)
transformCheckpoints.matches(new TransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound))
);
// other time upper bound
assertTrue(
dataFrameTransformCheckpoints.matches(
new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)
)
transformCheckpoints.matches(new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1))
);
}

View File

@ -17,12 +17,13 @@ import java.time.Instant;
public class TransformCheckpointingInfoTests extends AbstractSerializingTransformTestCase<TransformCheckpointingInfo> {
public static TransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
public static TransformCheckpointingInfo randomTransformCheckpointingInfo() {
return new TransformCheckpointingInfo(
TransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
TransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
TransformCheckpointStatsTests.randomTransformCheckpointStats(),
randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)));
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000))
);
}
@Override
@ -32,7 +33,7 @@ public class TransformCheckpointingInfoTests extends AbstractSerializingTransfor
@Override
protected TransformCheckpointingInfo createTestInstance() {
return randomDataFrameTransformCheckpointingInfo();
return randomTransformCheckpointingInfo();
}
@Override
@ -46,7 +47,8 @@ public class TransformCheckpointingInfoTests extends AbstractSerializingTransfor
TransformCheckpointStats.EMPTY,
randomNonNegativeLong(),
// changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null
null);
null
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_4_0);
checkpointingInfo.writeTo(output);

View File

@ -35,16 +35,17 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
private String transformId;
private boolean runWithHeaders;
public static TransformConfig randomDataFrameTransformConfigWithoutHeaders() {
return randomDataFrameTransformConfigWithoutHeaders(randomAlphaOfLengthBetween(1, 10));
public static TransformConfig randomTransformConfigWithoutHeaders() {
return randomTransformConfigWithoutHeaders(randomAlphaOfLengthBetween(1, 10));
}
public static TransformConfig randomTransformConfig() {
return randomTransformConfig(randomAlphaOfLengthBetween(1, 10));
}
public static TransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
return new TransformConfig(id,
public static TransformConfig randomTransformConfigWithoutHeaders(String id) {
return new TransformConfig(
id,
randomSourceConfig(),
randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
@ -53,11 +54,13 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null);
null
);
}
public static TransformConfig randomTransformConfig(String id) {
return new TransformConfig(id,
return new TransformConfig(
id,
randomSourceConfig(),
randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
@ -66,18 +69,33 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString());
randomBoolean() ? null : Version.CURRENT.toString()
);
}
public static TransformConfig randomInvalidDataFrameTransformConfig() {
public static TransformConfig randomInvalidTransformConfig() {
if (randomBoolean()) {
return new TransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(), randomDestConfig(),
null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
return new TransformConfig(
randomAlphaOfLengthBetween(1, 10),
randomInvalidSourceConfig(),
randomDestConfig(),
null,
randomBoolean() ? randomSyncConfig() : null,
randomHeaders(),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
);
} // else
return new TransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(),
null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
return new TransformConfig(
randomAlphaOfLengthBetween(1, 10),
randomSourceConfig(),
randomDestConfig(),
null,
randomBoolean() ? randomSyncConfig() : null,
randomHeaders(),
PivotConfigTests.randomInvalidPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
);
}
public static SyncConfig randomSyncConfig() {
@ -101,7 +119,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
@Override
protected TransformConfig createTestInstance() {
return runWithHeaders ? randomTransformConfig(transformId) : randomDataFrameTransformConfigWithoutHeaders(transformId);
return runWithHeaders ? randomTransformConfig(transformId) : randomTransformConfigWithoutHeaders(transformId);
}
@Override
@ -123,56 +141,6 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
public void testDefaultMatchAll() throws IOException {
String pivotTransform = "{"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
TransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "test_match_all");
assertNotNull(dataFrameTransformConfig.getSource().getQueryConfig());
assertTrue(dataFrameTransformConfig.getSource().getQueryConfig().isValid());
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformConfig.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String pivotTransformWithIdAndDefaults = Strings.toString(content);
assertThat(pivotTransformWithIdAndDefaults, matchesPattern(".*\"match_all\"\\s*:\\s*\\{\\}.*"));
}
}
public void testPreventHeaderInjection() {
String pivotTransform = "{"
+ " \"headers\" : {\"key\" : \"value\" },"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
}
public void testPreventCreateTimeInjection() {
String pivotTransform = "{"
+ " \"create_time\" : " + Instant.now().toEpochMilli() + " },"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
@ -187,8 +155,58 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
TransformConfig transformConfig = createTransformConfigFromString(pivotTransform, "test_match_all");
assertNotNull(transformConfig.getSource().getQueryConfig());
assertTrue(transformConfig.getSource().getQueryConfig().isValid());
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = transformConfig.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String pivotTransformWithIdAndDefaults = Strings.toString(content);
assertThat(pivotTransformWithIdAndDefaults, matchesPattern(".*\"match_all\"\\s*:\\s*\\{\\}.*"));
}
}
public void testPreventHeaderInjection() {
String pivotTransform = "{"
+ " \"headers\" : {\"key\" : \"value\" },"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class, () -> createTransformConfigFromString(pivotTransform, "test_header_injection"));
}
public void testPreventCreateTimeInjection() {
String pivotTransform = "{"
+ " \"create_time\" : "
+ Instant.now().toEpochMilli()
+ " },"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class, () -> createTransformConfigFromString(pivotTransform, "test_createTime_injection"));
}
public void testPreventVersionInjection() {
@ -208,22 +226,21 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
expectThrows(IllegalArgumentException.class, () -> createTransformConfigFromString(pivotTransform, "test_createTime_injection"));
}
public void testXContentForInternalStorage() throws IOException {
TransformConfig dataFrameTransformConfig = randomTransformConfig();
TransformConfig transformConfig = randomTransformConfig();
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformConfig.toXContent(xContentBuilder, getToXContentParams());
XContentBuilder content = transformConfig.toXContent(xContentBuilder, getToXContentParams());
String doc = Strings.toString(content);
assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_config\".*"));
}
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformConfig.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
XContentBuilder content = transformConfig.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String doc = Strings.toString(content);
assertFalse(doc.contains("doc_type"));
@ -231,46 +248,68 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
}
public void testMaxLengthDescription() {
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new TransformConfig("id",
randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001)));
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> new TransformConfig(
"id",
randomSourceConfig(),
randomDestConfig(),
null,
null,
null,
PivotConfigTests.randomPivotConfig(),
randomAlphaOfLength(1001)
)
);
assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length."));
String description = randomAlphaOfLength(1000);
TransformConfig config = new TransformConfig("id",
randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), description);
TransformConfig config = new TransformConfig(
"id",
randomSourceConfig(),
randomDestConfig(),
null,
null,
null,
PivotConfigTests.randomPivotConfig(),
description
);
assertThat(description, equalTo(config.getDescription()));
}
public void testSetIdInBody() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
+ " \"id\" : \"body_id\","
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
TransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "body_id");
assertEquals("body_id", dataFrameTransformConfig.getId());
TransformConfig transformConfig = createTransformConfigFromString(pivotTransform, "body_id");
assertEquals("body_id", transformConfig.getId());
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "other_id"));
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> createTransformConfigFromString(pivotTransform, "other_id")
);
assertEquals("Inconsistent id; 'body_id' specified in the body differs from 'other_id' specified as a URL argument",
ex.getCause().getMessage());
assertEquals(
"Inconsistent id; 'body_id' specified in the body differs from 'other_id' specified as a URL argument",
ex.getCause().getMessage()
);
}
private TransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
return TransformConfig.fromXContent(parser, id, false);
}
}

View File

@ -28,13 +28,14 @@ import static org.hamcrest.Matchers.equalTo;
public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
public static TransformConfigUpdate randomDataFrameTransformConfigUpdate() {
public static TransformConfigUpdate randomTransformConfigUpdate() {
return new TransformConfigUpdate(
randomBoolean() ? null : randomSourceConfig(),
randomBoolean() ? null : randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
);
}
public static SyncConfig randomSyncConfig() {
@ -48,7 +49,7 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
@Override
protected TransformConfigUpdate createTestInstance() {
return randomDataFrameTransformConfigUpdate();
return randomTransformConfigUpdate();
}
@Override
@ -61,24 +62,29 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
TransformConfig config = randomTransformConfig();
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null);
assertTrue("null update is not noop", update.isNoop(config));
update = new TransformConfigUpdate(config.getSource(),
update = new TransformConfigUpdate(
config.getSource(),
config.getDestination(),
config.getFrequency(),
config.getSyncConfig(),
config.getDescription());
config.getDescription()
);
assertTrue("equal update is not noop", update.isNoop(config));
update = new TransformConfigUpdate(config.getSource(),
update = new TransformConfigUpdate(
config.getSource(),
config.getDestination(),
config.getFrequency(),
config.getSyncConfig(),
"this is a new description");
"this is a new description"
);
assertFalse("true update is noop", update.isNoop(config));
}
}
public void testApply() {
TransformConfig config = new TransformConfig("time-transform",
TransformConfig config = new TransformConfig(
"time-transform",
randomSourceConfig(),
randomDestConfig(),
TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
@ -87,7 +93,8 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString());
randomBoolean() ? null : Version.V_7_2_0.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null);
assertThat(config, equalTo(update.apply(config)));
@ -112,7 +119,8 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
}
public void testApplyWithSyncChange() {
TransformConfig batchConfig = new TransformConfig("batch-transform",
TransformConfig batchConfig = new TransformConfig(
"batch-transform",
randomSourceConfig(),
randomDestConfig(),
TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
@ -121,19 +129,19 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString());
randomBoolean() ? null : Version.CURRENT.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null,
null,
null,
TimeSyncConfigTests.randomTimeSyncConfig(),
null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, TimeSyncConfigTests.randomTimeSyncConfig(), null);
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, () -> update.apply(batchConfig));
assertThat(ex.getMessage(),
equalTo("Cannot change the current sync configuration of transform [batch-transform] from [null] to [time]"));
assertThat(
ex.getMessage(),
equalTo("Cannot change the current sync configuration of transform [batch-transform] from [null] to [time]")
);
TransformConfig timeSyncedConfig = new TransformConfig("time-transform",
TransformConfig timeSyncedConfig = new TransformConfig(
"time-transform",
randomSourceConfig(),
randomDestConfig(),
TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
@ -142,16 +150,15 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString());
randomBoolean() ? null : Version.CURRENT.toString()
);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null,
null,
null,
new FooSync(),
null);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null, null, null, new FooSync(), null);
ex = expectThrows(ElasticsearchStatusException.class, () -> fooSyncUpdate.apply(timeSyncedConfig));
assertThat(ex.getMessage(),
equalTo("Cannot change the current sync configuration of transform [time-transform] from [time] to [foo]"));
assertThat(
ex.getMessage(),
equalTo("Cannot change the current sync configuration of transform [time-transform] from [time] to [foo]")
);
}
@ -178,8 +185,7 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
public void writeTo(StreamOutput out) throws IOException {}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -22,15 +22,17 @@ import static org.elasticsearch.xpack.core.transform.transforms.NodeAttributeTes
public class TransformStateTests extends AbstractSerializingTestCase<TransformState> {
public static TransformState randomDataFrameTransformState() {
return new TransformState(randomFrom(TransformTaskState.values()),
public static TransformState randomTransformState() {
return new TransformState(
randomFrom(TransformTaskState.values()),
randomFrom(IndexerState.values()),
TransformIndexerPositionTests.randomTransformIndexerPosition(),
randomLongBetween(0,10),
randomLongBetween(0, 10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomTransformProgress(),
randomBoolean() ? null : randomNodeAttributes(),
randomBoolean());
randomBoolean()
);
}
@Override
@ -40,7 +42,7 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
@Override
protected TransformState createTestInstance() {
return randomDataFrameTransformState();
return randomTransformState();
}
@Override
@ -59,14 +61,16 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
}
public void testBackwardsSerialization() throws IOException {
TransformState state = new TransformState(randomFrom(TransformTaskState.values()),
TransformState state = new TransformState(
randomFrom(TransformTaskState.values()),
randomFrom(IndexerState.values()),
TransformIndexerPositionTests.randomTransformIndexerPosition(),
randomLongBetween(0,10),
randomLongBetween(0, 10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomTransformProgress(),
randomBoolean() ? null : randomNodeAttributes(),
false); // Will be false after BWC deserialization
false
); // Will be false after BWC deserialization
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_5_0);
state.writeTo(output);

View File

@ -21,13 +21,15 @@ import static org.hamcrest.Matchers.equalTo;
public class TransformStatsTests extends AbstractSerializingTestCase<TransformStats> {
public static TransformStats randomDataFrameTransformStats() {
return new TransformStats(randomAlphaOfLength(10),
public static TransformStats randomTransformStats() {
return new TransformStats(
randomAlphaOfLength(10),
randomFrom(TransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
TransformIndexerStatsTests.randomStats(),
TransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo()
);
}
@Override
@ -37,7 +39,7 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
@Override
protected TransformStats createTestInstance() {
return randomDataFrameTransformStats();
return randomTransformStats();
}
@Override
@ -61,8 +63,9 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
}
public void testBwcWith73() throws IOException {
for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats("bwc-id",
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
@ -71,7 +74,10 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100, null));
100,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_3_0);
stats.writeTo(output);

View File

@ -17,15 +17,14 @@ import java.util.Collections;
public class TransformStoredDocTests extends AbstractSerializingTransformTestCase<TransformStoredDoc> {
protected static ToXContent.Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true"));
Collections.singletonMap(TransformField.FOR_INTERNAL_STORAGE, "true")
);
public static TransformStoredDoc randomTransformStoredDoc(String id) {
return new TransformStoredDoc(id,
TransformStateTests.randomDataFrameTransformState(),
TransformIndexerStatsTests.randomStats());
return new TransformStoredDoc(id, TransformStateTests.randomTransformState(), TransformIndexerStatsTests.randomStats());
}
public static TransformStoredDoc randomDataFrameTransformStoredDoc() {
public static TransformStoredDoc randomTransformStoredDoc() {
return randomTransformStoredDoc(randomAlphaOfLengthBetween(1, 10));
}
@ -36,14 +35,14 @@ public class TransformStoredDocTests extends AbstractSerializingTransformTestCas
@Override
// Setting params for internal storage so that we can check XContent equivalence as
// DataFrameIndexerTransformStats does not write the ID to the XContentObject unless it is for internal storage
// TransformIndexerTransformStats does not write the ID to the XContentObject unless it is for internal storage
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
@Override
protected TransformStoredDoc createTestInstance() {
return randomDataFrameTransformStoredDoc();
return randomTransformStoredDoc();
}
@Override

View File

@ -29,8 +29,10 @@ public class TransformPivotRestIT extends TransformRestTestCase {
private static final String TEST_USER_NAME = "transform_admin_plus_data";
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS =
basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS = basicAuthHeaderValue(
TEST_USER_NAME,
TEST_PASSWORD_SECURE_STRING
);
private static boolean indicesCreated = false;
@ -103,17 +105,20 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String pipelineId = "my-pivot-pipeline";
int pipelineValue = 42;
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity("{\n" +
" \"description\" : \"my pivot pipeline\",\n" +
" \"processors\" : [\n" +
" {\n" +
" \"set\" : {\n" +
" \"field\": \"pipeline_field\",\n" +
" \"value\": " + pipelineValue +
" }\n" +
" }\n" +
" ]\n" +
"}");
pipelineRequest.setJsonEntity(
"{\n"
+ " \"description\" : \"my pivot pipeline\",\n"
+ " \"processors\" : [\n"
+ " {\n"
+ " \"set\" : {\n"
+ " \"field\": \"pipeline_field\",\n"
+ " \"value\": "
+ pipelineValue
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}"
);
client().performRequest(pipelineRequest);
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
@ -141,11 +146,18 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformId = "simple_bucket_selector_pivot";
String transformIndex = "bucket_selector_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
+ " \"group_by\": {"
@ -186,11 +198,18 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + indexName + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
@ -280,17 +299,14 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
Map<String, Object> user26searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_26");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", user26searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user26searchResult))
.get(0);
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user26searchResult)).get(0);
assertThat(actual, greaterThan(3.92));
Map<String, Object> user42searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_42");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", user42searchResult));
actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user42searchResult))
.get(0);
actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user42searchResult)).get(0);
assertThat(actual, greaterThan(0.0));
assertThat(actual, lessThan(5.0));
}
@ -300,12 +316,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "pivot_reviews_via_histogram";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -338,12 +361,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "bigger_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -413,12 +443,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "pivot_reviews_via_date_histogram";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -449,11 +486,13 @@ public class TransformPivotRestIT extends TransformRestTestCase {
@SuppressWarnings("unchecked")
public void testPreviewTransform() throws Exception {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createPreviewRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + "_preview",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ,";
String config = "{" + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ,";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -487,24 +526,31 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String pipelineId = "my-preview-pivot-pipeline";
int pipelineValue = 42;
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity("{\n" +
" \"description\" : \"my pivot preview pipeline\",\n" +
" \"processors\" : [\n" +
" {\n" +
" \"set\" : {\n" +
" \"field\": \"pipeline_field\",\n" +
" \"value\": " + pipelineValue +
" }\n" +
" }\n" +
" ]\n" +
"}");
pipelineRequest.setJsonEntity(
"{\n"
+ " \"description\" : \"my pivot preview pipeline\",\n"
+ " \"processors\" : [\n"
+ " {\n"
+ " \"set\" : {\n"
+ " \"field\": \"pipeline_field\",\n"
+ " \"value\": "
+ pipelineValue
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}"
);
client().performRequest(pipelineRequest);
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview", null);
String config = "{ \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ,"
+ "\"dest\": {\"pipeline\": \"" + pipelineId + "\"},"
String config = "{ \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"} ,"
+ "\"dest\": {\"pipeline\": \""
+ pipelineId
+ "\"},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"user.id\": {\"terms\": { \"field\": \"user_id\" }},"
@ -518,7 +564,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
createPreviewRequest.setJsonEntity(config);
Map<String, Object> previewTransformResponse = entityAsMap(client().performRequest(createPreviewRequest));
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewTransformResponse.get("preview");
List<Map<String, Object>> preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
// preview is limited to 100
assertThat(preview.size(), equalTo(100));
Set<String> expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day", "pipeline_field"));
@ -527,7 +573,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
Set<String> keys = p.keySet();
assertThat(keys, equalTo(expectedTopLevelFields));
assertThat(p.get("pipeline_field"), equalTo(pipelineValue));
Map<String, Object> nestedObj = (Map<String, Object>)p.get("user");
Map<String, Object> nestedObj = (Map<String, Object>) p.get("user");
keys = nestedObj.keySet();
assertThat(keys, equalTo(expectedNestedFields));
});
@ -538,28 +584,35 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\": \"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\": \""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config +=" \"pivot\": { \n" +
" \"group_by\": {\n" +
" \"by_day\": {\"date_histogram\": {\n" +
" \"fixed_interval\": \"1d\",\"field\":\"timestamp\"\n" +
" }}\n" +
" },\n" +
" \n" +
" \"aggs\" :{\n" +
" \"avg_rating\": {\n" +
" \"avg\": {\"field\": \"stars\"}\n" +
" },\n" +
" \"timestamp\": {\n" +
" \"max\": {\"field\": \"timestamp\"}\n" +
" }\n" +
" }}"
config += " \"pivot\": { \n"
+ " \"group_by\": {\n"
+ " \"by_day\": {\"date_histogram\": {\n"
+ " \"fixed_interval\": \"1d\",\"field\":\"timestamp\"\n"
+ " }}\n"
+ " },\n"
+ " \n"
+ " \"aggs\" :{\n"
+ " \"avg_rating\": {\n"
+ " \"avg\": {\"field\": \"stars\"}\n"
+ " },\n"
+ " \"timestamp\": {\n"
+ " \"max\": {\"field\": \"timestamp\"}\n"
+ " }\n"
+ " }}"
+ "}";
createTransformRequest.setJsonEntity(config);
@ -585,12 +638,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "scripted_metric_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -638,12 +698,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "bucket_script_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -690,12 +757,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "geo_bounds_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -729,11 +803,12 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
Map<String, Object> actualObj = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue("hits.hits._source.boundary",
searchResult))
.get(0);
Map<String, Object> actualObj = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.boundary",
searchResult
)).get(0);
assertThat(actualObj.get("type"), equalTo("point"));
List<Double> coordinates = (List<Double>)actualObj.get("coordinates");
List<Double> coordinates = (List<Double>) actualObj.get("coordinates");
assertEquals((4 + 10), coordinates.get(1), 0.000001);
assertEquals((4 + 15), coordinates.get(0), 0.000001);
}
@ -743,12 +818,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "geo_centroid_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -793,12 +875,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformIndex = "weighted_avg_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
@ -831,12 +920,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
String transformId = "test_with_many_buckets";
String transformIndex = transformId + "-idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"pivot\": {"
+ " \"max_page_search_size\": 10,"
+ " \"group_by\": {"
@ -861,26 +957,34 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertTrue(indexExists(transformIndex));
Map<String, Object> stats = getAsMap(getTransformEndpoint() + transformId + "/_stats");
assertEquals(101, ((List<?>)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0));
assertEquals(101, ((List<?>) XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0));
}
public void testContinuousStopWaitForCheckpoint() throws Exception {
Request updateLoggingLevels = new Request("PUT", "/_cluster/settings");
updateLoggingLevels.setJsonEntity(
"{\"transient\": {" +
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
"{\"transient\": {"
+ "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
);
client().performRequest(updateLoggingLevels);
String indexName = "continuous_reviews_wait_for_checkpoint";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot_wait_for_checkpoint";
String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex);
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
String transformIndex = "pivot_reviews_continuous_wait_for_checkpoint";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\"" + indexName + "\"},"
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
@ -895,15 +999,15 @@ public class TransformPivotRestIT extends TransformRestTestCase {
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForContinuousTransform(transformId, dataFrameIndex, null);
assertTrue(indexExists(dataFrameIndex));
startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));
assertBusy(() -> {
try {
stopTransform(transformId,false, true);
stopTransform(transformId, false, true);
} catch (ResponseException e) {
// We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200));
@ -911,11 +1015,11 @@ public class TransformPivotRestIT extends TransformRestTestCase {
});
// get and check some users
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
deleteIndex(indexName);
}
}

View File

@ -170,20 +170,20 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
createReviewsIndex(indexName, 1000);
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query) throws IOException {
createPivotReviewsTransform(transformId, dataFrameIndex, query, null);
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
createPivotReviewsTransform(transformId, transformIndex, query, null);
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline)
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline)
throws IOException {
createPivotReviewsTransform(transformId, dataFrameIndex, query, pipeline, null);
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, null);
}
protected void createContinuousPivotReviewsTransform(String transformId, String dataFrameIndex, String authHeader) throws IOException {
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
String config = "{ \"dest\": {\"index\":\"" + transformIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
// Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
+ " \"frequency\": \"1s\","
@ -200,22 +200,22 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
+ " } } } }"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader)
throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
String config = "{";
if (pipeline != null) {
config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\", \"pipeline\":\"" + pipeline + "\"},";
config += " \"dest\": {\"index\":\"" + transformIndex + "\", \"pipeline\":\"" + pipeline + "\"},";
} else {
config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
config += " \"dest\": {\"index\":\"" + transformIndex + "\"},";
}
if (query != null) {
@ -238,10 +238,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
+ "\"frequency\":\"1s\""
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void startTransform(String transformId) throws IOException {
@ -271,38 +271,38 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception {
startAndWaitForTransform(transformId, dataFrameIndex, null);
protected void startAndWaitForTransform(String transformId, String transformIndex) throws Exception {
startAndWaitForTransform(transformId, transformIndex, null);
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]);
protected void startAndWaitForTransform(String transformId, String transformIndex, String authHeader) throws Exception {
startAndWaitForTransform(transformId, transformIndex, authHeader, new String[0]);
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings)
protected void startAndWaitForTransform(String transformId, String transformIndex, String authHeader, String... warnings)
throws Exception {
// start the transform
startTransform(transformId, authHeader, warnings);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
assertTrue(indexExists(transformIndex));
// wait until the transform has been created and all data is available
waitForTransformCheckpoint(transformId);
waitForDataFrameStopped(transformId);
refreshIndex(dataFrameIndex);
waitForTransformStopped(transformId);
refreshIndex(transformIndex);
}
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L);
protected void startAndWaitForContinuousTransform(String transformId, String transformIndex, String authHeader) throws Exception {
startAndWaitForContinuousTransform(transformId, transformIndex, authHeader, 1L);
}
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint)
protected void startAndWaitForContinuousTransform(String transformId, String transformIndex, String authHeader, long checkpoint)
throws Exception {
// start the transform
startTransform(transformId, authHeader, new String[0]);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
assertTrue(indexExists(transformIndex));
// wait until the transform has been created and all data is available
waitForTransformCheckpoint(transformId, checkpoint);
refreshIndex(dataFrameIndex);
refreshIndex(transformIndex);
}
protected Request createRequestWithAuth(final String method, final String endpoint, final String authHeader) {
@ -317,16 +317,16 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
return request;
}
void waitForDataFrameStopped(String transformId) throws Exception {
void waitForTransformStopped(String transformId) throws Exception {
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
}
void waitForDataFrameCheckpoint(String transformId) throws Exception {
void waitForTransformCheckpoint(String transformId) throws Exception {
waitForTransformCheckpoint(transformId, 1L);
}
void waitForTransformCheckpoint(String transformId, long checkpoint) throws Exception {
assertBusy(() -> assertEquals(checkpoint, getDataFrameCheckpoint(transformId)), 30, TimeUnit.SECONDS);
assertBusy(() -> assertEquals(checkpoint, getTransformCheckpoint(transformId)), 30, TimeUnit.SECONDS);
}
void refreshIndex(String index) throws IOException {
@ -412,7 +412,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertEquals(0, XContentMapValues.extractValue("hits.total.value", searchResult));
} catch (ResponseException e) {
// 404 here just means we had no data frame transforms, true for some tests
// 404 here just means we had no transforms, true for some tests
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
@ -423,7 +423,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(TransformField.TASK_NAME) == false);
}
static int getDataFrameCheckpoint(String transformId) throws IOException {
static int getTransformCheckpoint(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);

View File

@ -103,9 +103,9 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
public void testStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
String dataFrameIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(dataFrameIndex);
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
failureTransforms.add(transformId);
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);

View File

@ -276,12 +276,12 @@ public class Transform extends Plugin implements ActionPlugin, PersistentTaskPlu
TransformInternalIndex.getIndexTemplateMetaData()
);
} catch (IOException e) {
logger.error("Error creating data frame index template", e);
logger.error("Error creating transform index template", e);
}
try {
templates.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
logger.warn("Error creating data frame audit index", e);
logger.warn("Error creating transform audit index", e);
}
return templates;
};

View File

@ -34,7 +34,7 @@ public interface CheckpointProvider {
void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener<Boolean> listener);
/**
* Get checkpoint statistics for a running data frame
* Get checkpoint statistics for a running transform
*
* For running transforms most information is available in-memory.
*
@ -53,7 +53,7 @@ public interface CheckpointProvider {
);
/**
* Get checkpoint statistics for a stopped data frame
* Get checkpoint statistics for a stopped transform
*
* For stopped transforms we need to do lookups in the internal index.
*

View File

@ -69,7 +69,7 @@ public class TransformTaskTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
TransformConfig transformConfig = TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders();
TransformAuditor auditor = new MockTransformAuditor();
TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager();
TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(
@ -157,7 +157,7 @@ public class TransformTaskTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
TransformConfig transformConfig = TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders();
TransformAuditor auditor = new MockTransformAuditor();
TransformState transformState = new TransformState(