Enable spotless for enrich gradle project in 7 dot x branch. (#48976)

Backport of #48908

The enrich project doesn't have much history as all the other gradle projects,
so it makes sense to enable spotless for this gradle project.
This commit is contained in:
Martijn van Groningen 2019-11-12 13:22:34 +01:00 committed by GitHub
parent 9baea80853
commit 18d5d73305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 2072 additions and 1215 deletions

View File

@ -205,7 +205,7 @@
<setting id="org.eclipse.jdt.core.formatter.alignment_for_resources_in_try" value="48"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.parentheses_positions_in_try_clause" value="separate_lines_if_wrapped"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="48"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="80"/>
<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.keep_code_block_on_one_line" value="one_line_if_empty"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>

View File

@ -106,7 +106,7 @@ subprojects {
// is greater than the number of unformatted projects, this can be
// switched to an exclude list, and eventualy removed completely.
def projectPathsToFormat = [
// ':build-tools'
':x-pack:plugin:enrich'
]
if (projectPathsToFormat.contains(project.path)) {

View File

@ -35,15 +35,31 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
protected final String matchField;
protected final int maxMatches;
protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
protected AbstractEnrichProcessor(
String tag,
Client client,
String policyName,
String field,
String targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
int maxMatches
) {
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}
protected AbstractEnrichProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
String matchField, int maxMatches) {
protected AbstractEnrichProcessor(
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
int maxMatches
) {
super(tag);
this.policyName = policyName;
this.searchRunner = searchRunner;
@ -155,13 +171,11 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
client.execute(
EnrichCoordinatorProxyAction.INSTANCE,
req,
ActionListener.wrap(resp -> { handler.accept(resp, null); }, e -> { handler.accept(null, e); })
);
};
}
}

View File

@ -109,8 +109,10 @@ public final class EnrichMetadata extends AbstractNamedDiffable<MetaData.Custom>
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
EnrichMetadata that = (EnrichMetadata) o;
return policies.equals(that.policies);
}

View File

@ -70,33 +70,57 @@ import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
Setting.intSetting("enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING = Setting.intSetting(
"enrich.fetch_size",
10000,
1,
1000000,
Setting.Property.NodeScope
);
static final Setting<Integer> ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS =
Setting.intSetting("enrich.max_concurrent_policy_executions", 50, 1, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS = Setting.intSetting(
"enrich.max_concurrent_policy_executions",
50,
1,
Setting.Property.NodeScope
);
static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD =
Setting.timeSetting("enrich.cleanup_period", new TimeValue(15, TimeUnit.MINUTES), Setting.Property.NodeScope);
static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD = Setting.timeSetting(
"enrich.cleanup_period",
new TimeValue(15, TimeUnit.MINUTES),
Setting.Property.NodeScope
);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS =
Setting.intSetting("enrich.coordinator_proxy.max_concurrent_requests", 8, 1, 10000, Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS = Setting.intSetting(
"enrich.coordinator_proxy.max_concurrent_requests",
8,
1,
10000,
Setting.Property.NodeScope
);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST =
Setting.intSetting("enrich.coordinator_proxy.max_lookups_per_request", 128, 1, 10000, Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST = Setting.intSetting(
"enrich.coordinator_proxy.max_lookups_per_request",
128,
1,
10000,
Setting.Property.NodeScope
);
static final Setting<Integer> ENRICH_MAX_FORCE_MERGE_ATTEMPTS =
Setting.intSetting("enrich.max_force_merge_attempts", 3, 1, 10, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_MAX_FORCE_MERGE_ATTEMPTS = Setting.intSetting(
"enrich.max_force_merge_attempts",
3,
1,
10,
Setting.Property.NodeScope
);
private static final String QUEUE_CAPACITY_SETTING_NAME = "enrich.coordinator_proxy.queue_capacity";
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME,
settings -> {
int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings);
int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings);
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
},
val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME),
Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME, settings -> {
int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings);
int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings);
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
}, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);
private final Settings settings;
private final Boolean enabled;
@ -119,7 +143,9 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory);
}
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
@ -138,10 +164,15 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
);
}
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
if (enabled == false) {
return emptyList();
}
@ -156,17 +187,29 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry
) {
if (enabled == false || transportClientMode) {
return emptyList();
}
EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client,
clusterService, threadPool, enrichPolicyLocks);
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(
settings,
client,
clusterService,
threadPool,
enrichPolicyLocks
);
enrichPolicyMaintenanceService.initialize();
return Arrays.asList(
enrichPolicyLocks,
@ -188,8 +231,11 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, EnrichMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, EnrichMetadata.TYPE,
in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in))
new NamedWriteableRegistry.Entry(
NamedDiff.class,
EnrichMetadata.TYPE,
in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in)
)
);
}

View File

@ -43,14 +43,16 @@ public class EnrichPolicyExecutor {
private final int maxForceMergeAttempts;
private final Semaphore policyExecutionPermits;
public EnrichPolicyExecutor(Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier) {
public EnrichPolicyExecutor(
Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier
) {
this.clusterService = clusterService;
this.client = client;
this.taskManager = taskManager;
@ -69,8 +71,14 @@ public class EnrichPolicyExecutor {
if (policyExecutionPermits.tryAcquire() == false) {
// Release policy lock, and throw a different exception
policyLocks.releasePolicy(policyName);
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + "] would exceed " +
"maximum concurrent policy executions [" + maximumConcurrentPolicyExecutions + "]");
throw new EsRejectedExecutionException(
"Policy execution failed. Policy execution for ["
+ policyName
+ "] would exceed "
+ "maximum concurrent policy executions ["
+ maximumConcurrentPolicyExecutions
+ "]"
);
}
}
@ -88,8 +96,12 @@ public class EnrichPolicyExecutor {
private final BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse;
private final BiConsumer<Task, Exception> onFailure;
PolicyCompletionListener(String policyName, ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
PolicyCompletionListener(
String policyName,
ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
this.policyName = policyName;
this.task = task;
this.onResponse = onResponse;
@ -120,10 +132,24 @@ public class EnrichPolicyExecutor {
}
}
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
return new EnrichPolicyRunner(policyName, policy, task, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
fetchSize, maxForceMergeAttempts);
protected Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return new EnrichPolicyRunner(
policyName,
policy,
task,
listener,
clusterService,
client,
indexNameExpressionResolver,
nowSupplier,
fetchSize,
maxForceMergeAttempts
);
}
private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
@ -143,18 +169,28 @@ public class EnrichPolicyExecutor {
return runPolicy(request, getPolicy(request), listener);
}
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener) {
public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, listener::onResponse, listener::onFailure);
}
private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
private Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
tryLockingPolicy(request.getName());
try {
return runPolicyTask(request, policy, onResponse, onFailure);
@ -165,8 +201,12 @@ public class EnrichPolicyExecutor {
}
}
private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
private Task runPolicyTask(
final ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {

View File

@ -70,8 +70,9 @@ public class EnrichPolicyLocks {
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
boolean acquired = runLock.tryAcquire();
if (acquired == false) {
throw new EsRejectedExecutionException("Could not obtain lock because policy execution for [" + policyName +
"] is already in progress.");
throw new EsRejectedExecutionException(
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
);
}
policyRunCounter.incrementAndGet();
} finally {
@ -105,8 +106,7 @@ public class EnrichPolicyLocks {
*/
boolean isSameState(EnrichPolicyExecutionState previousState) {
EnrichPolicyExecutionState currentState = captureExecutionState();
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight &&
currentState.executions == previousState.executions;
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight && currentState.executions == previousState.executions;
}
/**

View File

@ -54,8 +54,13 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
private volatile Scheduler.Cancellable cancellable;
private final Semaphore maintenanceLock = new Semaphore(1);
EnrichPolicyMaintenanceService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
EnrichPolicyLocks enrichPolicyLocks) {
EnrichPolicyMaintenanceService(
Settings settings,
Client client,
ClusterService clusterService,
ThreadPool threadPool,
EnrichPolicyLocks enrichPolicyLocks
) {
this.settings = settings;
this.client = new OriginSettingClient(client, ENRICH_ORIGIN);
this.clusterService = clusterService;
@ -135,8 +140,7 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
void cleanUpEnrichIndices() {
final Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
GetIndexRequest indices = new GetIndexRequest()
.indices(EnrichPolicy.ENRICH_INDEX_NAME_BASE + "*")
GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.ENRICH_INDEX_NAME_BASE + "*")
.indicesOptions(IndicesOptions.lenientExpand());
// Check that no enrich policies are being executed
final EnrichPolicyLocks.EnrichPolicyExecutionState executionState = enrichPolicyLocks.captureExecutionState();
@ -189,24 +193,24 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
logger.debug("Enrich index [{}] is not marked as a live index since it has no alias information", indexName);
return true;
}
boolean hasAlias = aliasMetadata
.stream()
.anyMatch((aliasMetaData -> aliasMetaData.getAlias().equals(aliasName)));
boolean hasAlias = aliasMetadata.stream().anyMatch((aliasMetaData -> aliasMetaData.getAlias().equals(aliasName)));
// Index is not currently published to the enrich alias. Should be marked for removal.
if (hasAlias == false) {
logger.debug("Enrich index [{}] is not marked as a live index since it lacks the alias [{}]", indexName, aliasName);
return true;
}
logger.debug("Enrich index [{}] was spared since it is associated with the valid policy [{}] and references alias [{}]",
indexName, policyName, aliasName);
logger.debug(
"Enrich index [{}] was spared since it is associated with the valid policy [{}] and references alias [{}]",
indexName,
policyName,
aliasName
);
return false;
}
private void deleteIndices(String[] removeIndices) {
if (removeIndices.length != 0) {
DeleteIndexRequest deleteIndices = new DeleteIndexRequest()
.indices(removeIndices)
.indicesOptions(IGNORE_UNAVAILABLE);
DeleteIndexRequest deleteIndices = new DeleteIndexRequest().indices(removeIndices).indicesOptions(IGNORE_UNAVAILABLE);
client.admin().indices().delete(deleteIndices, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
@ -216,8 +220,10 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
@Override
public void onFailure(Exception e) {
logger.error(() -> "Enrich maintenance task could not delete abandoned enrich indices [" +
Arrays.toString(removeIndices) + "]", e);
logger.error(
() -> "Enrich maintenance task could not delete abandoned enrich indices [" + Arrays.toString(removeIndices) + "]",
e
);
concludeMaintenance();
}
});

View File

@ -72,20 +72,32 @@ public class EnrichPolicyReindexPipeline {
private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) {
try {
return XContentBuilder.builder(xContentType.xContent())
.startObject()
.field("description", "This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup " +
"purposes. It is an internal pipeline and should not be modified.")
.field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION)
.startArray("processors")
.startObject()
// remove the id from the document so that documents from multiple indices will always be unique.
.startObject("remove")
.field("field", "_id")
.endObject()
.endObject()
.endArray()
.endObject();
XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
builder.startObject();
{
builder.field(
"description",
"This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup "
+ "purposes. It is an internal pipeline and should not be modified."
);
builder.field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION);
builder.startArray("processors");
{
builder.startObject();
{
// remove the id from the document so that documents from multiple indices will always be unique.
builder.startObject("remove");
{
builder.field("field", "_id");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create pipeline for enrich document sanitization", e);
}

View File

@ -80,10 +80,18 @@ public class EnrichPolicyRunner implements Runnable {
private final int fetchSize;
private final int maxForceMergeAttempts;
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener, ClusterService clusterService, Client client,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier, int fetchSize,
int maxForceMergeAttempts) {
EnrichPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener,
ClusterService clusterService,
Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier,
int fetchSize,
int maxForceMergeAttempts
) {
this.policyName = policyName;
this.policy = policy;
this.task = task;
@ -124,7 +132,10 @@ public class EnrichPolicyRunner implements Runnable {
if (indexMapping.keys().size() == 0) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. No mapping available on source [{}] included in [{}]",
policyName, sourceIndexName, policy.getIndices());
policyName,
sourceIndexName,
policy.getIndices()
);
}
assert indexMapping.keys().size() == 1 : "Expecting only one type per index";
MappingMetaData typeMapping = indexMapping.iterator().next().value;
@ -140,15 +151,20 @@ public class EnrichPolicyRunner implements Runnable {
}
}
static void validateMappings(final String policyName,
final EnrichPolicy policy,
final String sourceIndex,
final Map<String, Object> mapping) {
static void validateMappings(
final String policyName,
final EnrichPolicy policy,
final String sourceIndex,
final Map<String, Object> mapping
) {
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
policyName,
sourceIndex,
policy.getIndices()
);
}
// Validate the key and values
try {
@ -159,12 +175,15 @@ public class EnrichPolicyRunner implements Runnable {
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
e,
policyName,
sourceIndex
);
}
}
private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
assert Strings.isEmpty(fieldName) == false: "Field name cannot be null or empty";
assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();
Map<?, ?> currentField = properties;
@ -206,7 +225,7 @@ public class EnrichPolicyRunner implements Runnable {
}
}
if (onRoot) {
onRoot = false;
onRoot = false;
} else {
parent.append(".");
}
@ -230,24 +249,35 @@ public class EnrichPolicyRunner implements Runnable {
// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
builder = builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.field("dynamic", false)
.startObject("_source")
.field("enabled", true)
.endObject()
.startObject("properties")
.startObject(policy.getMatchField());
builder = matchFieldMapping.apply(builder).endObject().endObject()
.startObject("_meta")
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField())
.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType())
.endObject()
.endObject()
.endObject();
builder.startObject();
{
builder.startObject(MapperService.SINGLE_MAPPING_NAME);
{
builder.field("dynamic", false);
builder.startObject("_source");
{
builder.field("enabled", true);
}
builder.endObject();
builder.startObject("properties");
{
builder.startObject(policy.getMatchField());
matchFieldMapping.apply(builder);
builder.endObject();
}
builder.endObject();
builder.startObject("_meta");
{
builder.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT);
builder.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName);
builder.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField());
builder.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType());
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
} catch (IOException ioe) {
throw new UncheckedIOException("Could not render enrich mapping", ioe);
@ -312,8 +342,7 @@ public class EnrichPolicyRunner implements Runnable {
if (policy.getQuery() != null) {
searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery()));
}
ReindexRequest reindexRequest = new ReindexRequest()
.setDestIndex(destinationIndexName)
ReindexRequest reindexRequest = new ReindexRequest().setDestIndex(destinationIndexName)
.setSourceIndices(policy.getIndices().toArray(new String[0]));
reindexRequest.getSearchRequest().source(searchSourceBuilder);
reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE);
@ -328,8 +357,12 @@ public class EnrichPolicyRunner implements Runnable {
} else if (bulkByScrollResponse.getSearchFailures().size() > 0) {
listener.onFailure(new ElasticsearchException("Encountered search failures during reindex process"));
} else {
logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName,
bulkByScrollResponse.getCreated(), destinationIndexName);
logger.info(
"Policy [{}]: Transferred [{}] documents to enrich index [{}]",
policyName,
bulkByScrollResponse.getCreated(),
destinationIndexName
);
forceMergeEnrichIndex(destinationIndexName, 1);
}
}
@ -342,20 +375,26 @@ public class EnrichPolicyRunner implements Runnable {
}
private void forceMergeEnrichIndex(final String destinationIndexName, final int attempt) {
logger.debug("Policy [{}]: Force merging newly created enrich index [{}] (Attempt {}/{})", policyName, destinationIndexName,
attempt, maxForceMergeAttempts);
client.admin().indices().forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1),
new ActionListener<ForceMergeResponse>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
refreshEnrichIndex(destinationIndexName, attempt);
}
logger.debug(
"Policy [{}]: Force merging newly created enrich index [{}] (Attempt {}/{})",
policyName,
destinationIndexName,
attempt,
maxForceMergeAttempts
);
client.admin()
.indices()
.forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1), new ActionListener<ForceMergeResponse>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
refreshEnrichIndex(destinationIndexName, attempt);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void refreshEnrichIndex(final String destinationIndexName, final int attempt) {
@ -379,8 +418,10 @@ public class EnrichPolicyRunner implements Runnable {
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
if (indexSegments == null) {
throw new ElasticsearchException("Could not locate segment information for newly created index [{}]",
destinationIndexName);
throw new ElasticsearchException(
"Could not locate segment information for newly created index [{}]",
destinationIndexName
);
}
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
@ -390,12 +431,22 @@ public class EnrichPolicyRunner implements Runnable {
if (primarySegments.getSegments().size() > 1) {
int nextAttempt = attempt + 1;
if (nextAttempt > maxForceMergeAttempts) {
listener.onFailure(new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName, attempt, maxForceMergeAttempts));
listener.onFailure(
new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName,
attempt,
maxForceMergeAttempts
)
);
} else {
logger.debug("Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName, primarySegments.getSegments().size(), nextAttempt, maxForceMergeAttempts);
logger.debug(
"Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName,
primarySegments.getSegments().size(),
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
}
} else {
@ -413,11 +464,8 @@ public class EnrichPolicyRunner implements Runnable {
private void setIndexReadOnly(final String destinationIndexName) {
logger.debug("Policy [{}]: Setting new enrich index [{}] to be read only", policyName, destinationIndexName);
UpdateSettingsRequest request = new UpdateSettingsRequest(destinationIndexName)
.setPreserveExisting(true)
.settings(Settings.builder()
.put("index.auto_expand_replicas", "0-all")
.put("index.blocks.write", "true"));
UpdateSettingsRequest request = new UpdateSettingsRequest(destinationIndexName).setPreserveExisting(true)
.settings(Settings.builder().put("index.auto_expand_replicas", "0-all").put("index.blocks.write", "true"));
client.admin().indices().updateSettings(request, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
@ -451,8 +499,9 @@ public class EnrichPolicyRunner implements Runnable {
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
ImmutableOpenMap<String, List<AliasMetaData>> aliases =
clusterService.state().metaData().findAliases(aliasRequest, concreteIndices);
ImmutableOpenMap<String, List<AliasMetaData>> aliases = clusterService.state()
.metaData()
.findAliases(aliasRequest, concreteIndices);
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
String[] indices = aliases.keys().toArray(String.class);
if (indices.length > 0) {

View File

@ -44,8 +44,10 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
String policyType =
(String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap);
String policyType = (String) XContentMapValues.extractValue(
"_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME,
mappingAsMap
);
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
@ -58,13 +60,32 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches);
return new MatchProcessor(
tag,
client,
policyName,
field,
targetField,
overrideEnabled,
ignoreMissing,
matchField,
maxMatches
);
case EnrichPolicy.GEO_MATCH_TYPE:
String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects");
ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr);
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches, shapeRelation);
return new GeoMatchProcessor(
tag,
client,
policyName,
field,
targetField,
overrideEnabled,
ignoreMissing,
matchField,
maxMatches,
shapeRelation
);
default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}

View File

@ -44,11 +44,13 @@ public final class EnrichStore {
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public static void putPolicy(final String name,
final EnrichPolicy policy,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Consumer<Exception> handler) {
public static void putPolicy(
final String name,
final EnrichPolicy policy,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Consumer<Exception> handler
) {
assert clusterService.localNode().isMasterNode();
if (Strings.isNullOrEmpty(name)) {
@ -59,15 +61,21 @@ public final class EnrichStore {
}
// The policy name is used to create the enrich index name and
// therefor a policy name has the same restrictions as an index name
MetaDataCreateIndexService.validateIndexOrAliasName(name,
(policyName, error) -> new IllegalArgumentException("Invalid policy name [" + policyName + "], " + error));
MetaDataCreateIndexService.validateIndexOrAliasName(
name,
(policyName, error) -> new IllegalArgumentException("Invalid policy name [" + policyName + "], " + error)
);
if (name.toLowerCase(Locale.ROOT).equals(name) == false) {
throw new IllegalArgumentException("Invalid policy name [" + name + "], must be lowercase");
}
Set<String> supportedPolicyTypes = new HashSet<>(Arrays.asList(EnrichPolicy.SUPPORTED_POLICY_TYPES));
if (supportedPolicyTypes.contains(policy.getType()) == false) {
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() +
"], supported types are " + Arrays.toString(EnrichPolicy.SUPPORTED_POLICY_TYPES));
throw new IllegalArgumentException(
"unsupported policy type ["
+ policy.getType()
+ "], supported types are "
+ Arrays.toString(EnrichPolicy.SUPPORTED_POLICY_TYPES)
);
}
final EnrichPolicy finalPolicy;
@ -86,8 +94,11 @@ public final class EnrichStore {
updateClusterState(clusterService, handler, current -> {
for (String indexExpression : finalPolicy.getIndices()) {
// indices field in policy can contain wildcards, aliases etc.
String[] concreteIndices =
indexNameExpressionResolver.concreteIndexNames(current, IndicesOptions.strictExpandOpen(), indexExpression);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(
current,
IndicesOptions.strictExpandOpen(),
indexExpression
);
for (String concreteIndex : concreteIndices) {
IndexMetaData imd = current.getMetaData().index(concreteIndex);
assert imd != null;
@ -166,9 +177,11 @@ public final class EnrichStore {
return policies;
}
private static void updateClusterState(ClusterService clusterService,
Consumer<Exception> handler,
Function<ClusterState, Map<String, EnrichPolicy>> function) {
private static void updateClusterState(
ClusterService clusterService,
Consumer<Exception> handler,
Function<ClusterState, Map<String, EnrichPolicy>> function
) {
clusterService.submitStateUpdateTask("update-enrich-metadata", new ClusterStateUpdateTask() {
@Override
@ -177,9 +190,7 @@ public final class EnrichStore {
MetaData metaData = MetaData.builder(currentState.metaData())
.putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies))
.build();
return ClusterState.builder(currentState)
.metaData(metaData)
.build();
return ClusterState.builder(currentState).metaData(metaData).build();
}
@Override

View File

@ -15,8 +15,7 @@ class ExecuteEnrichPolicyTask extends Task {
private volatile ExecuteEnrichPolicyStatus status;
ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}

View File

@ -25,30 +25,35 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
private ShapeRelation shapeRelation;
GeoMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation) {
GeoMatchProcessor(
String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation
) {
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}
/** used in tests **/
GeoMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches, ShapeRelation shapeRelation) {
GeoMatchProcessor(
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation
) {
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}

View File

@ -17,28 +17,32 @@ import java.util.function.BiConsumer;
public class MatchProcessor extends AbstractEnrichProcessor {
MatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches) {
MatchProcessor(
String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches
) {
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}
/** used in tests **/
MatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches) {
MatchProcessor(
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches
) {
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}

View File

@ -87,8 +87,12 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
);
}
Coordinator(BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction,
int maxLookupsPerRequest, int maxNumberOfConcurrentRequests, int queueCapacity) {
Coordinator(
BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction,
int maxLookupsPerRequest,
int maxNumberOfConcurrentRequests,
int queueCapacity
) {
this.lookupFunction = lookupFunction;
this.maxLookupsPerRequest = maxLookupsPerRequest;
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
@ -110,13 +114,17 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
}
CoordinatorStats getStats(String nodeId) {
return new CoordinatorStats(nodeId, queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
executedSearchesTotal.get());
return new CoordinatorStats(
nodeId,
queue.size(),
remoteRequestsCurrent.get(),
remoteRequestsTotal,
executedSearchesTotal.get()
);
}
synchronized void coordinateLookups() {
while (queue.isEmpty() == false &&
remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) {
while (queue.isEmpty() == false && remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) {
final List<Slot> slots = new ArrayList<>();
queue.drainTo(slots, maxLookupsPerRequest);
@ -125,9 +133,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
remoteRequestsCurrent.incrementAndGet();
remoteRequestsTotal++;
lookupFunction.accept(multiSearchRequest, (response, e) -> {
handleResponse(slots, response, e);
});
lookupFunction.accept(multiSearchRequest, (response, e) -> { handleResponse(slots, response, e); });
}
}
@ -173,8 +179,10 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
int slot = 0;
final Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex = new HashMap<>();
for (SearchRequest searchRequest : request.requests()) {
List<Tuple<Integer, SearchRequest>> items =
itemsPerIndex.computeIfAbsent(searchRequest.indices()[0], k -> new ArrayList<>());
List<Tuple<Integer, SearchRequest>> items = itemsPerIndex.computeIfAbsent(
searchRequest.indices()[0],
k -> new ArrayList<>()
);
items.add(new Tuple<>(slot, searchRequest));
slot++;
}
@ -184,20 +192,17 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
for (Map.Entry<String, List<Tuple<Integer, SearchRequest>>> entry : itemsPerIndex.entrySet()) {
final String enrichIndexName = entry.getKey();
final List<Tuple<Integer, SearchRequest>> enrichIndexRequestsAndSlots = entry.getValue();
ActionListener<MultiSearchResponse> listener = ActionListener.wrap(
response -> {
shardResponses.put(enrichIndexName, new Tuple<>(response, null));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
},
e -> {
shardResponses.put(enrichIndexName, new Tuple<>(null, e));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
ActionListener<MultiSearchResponse> listener = ActionListener.wrap(response -> {
shardResponses.put(enrichIndexName, new Tuple<>(response, null));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
);
}, e -> {
shardResponses.put(enrichIndexName, new Tuple<>(null, e));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
});
MultiSearchRequest mrequest = new MultiSearchRequest();
enrichIndexRequestsAndSlots.stream().map(Tuple::v2).forEach(mrequest::add);
@ -206,9 +211,11 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
};
}
static MultiSearchResponse reduce(int numRequest,
Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex,
Map<String, Tuple<MultiSearchResponse, Exception>> shardResponses) {
static MultiSearchResponse reduce(
int numRequest,
Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex,
Map<String, Tuple<MultiSearchResponse, Exception>> shardResponses
) {
MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numRequest];
for (Map.Entry<String, Tuple<MultiSearchResponse, Exception>> rspEntry : shardResponses.entrySet()) {
List<Tuple<Integer, SearchRequest>> reqSlots = itemsPerIndex.get(rspEntry.getKey());

View File

@ -114,10 +114,24 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
private final EnrichCoordinatorProxyAction.Coordinator coordinator;
@Inject
public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, EnrichCoordinatorProxyAction.Coordinator coordinator) {
super(NAME, threadPool, clusterService, transportService, actionFilters, Request::new, NodeRequest::new,
ThreadPool.Names.SAME, NodeResponse.class);
public TransportAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
EnrichCoordinatorProxyAction.Coordinator coordinator
) {
super(
NAME,
threadPool,
clusterService,
transportService,
actionFilters,
Request::new,
NodeRequest::new,
ThreadPool.Names.SAME,
NodeResponse.class
);
this.coordinator = coordinator;
}

View File

@ -104,7 +104,8 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
public Request(MultiSearchRequest multiSearchRequest) {
super(multiSearchRequest.requests().get(0).indices()[0]);
this.multiSearchRequest = multiSearchRequest;
assert multiSearchRequest.requests().stream()
assert multiSearchRequest.requests()
.stream()
.map(SearchRequest::indices)
.flatMap(Arrays::stream)
.distinct()
@ -121,8 +122,10 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = validateNonNullIndex();
if (index.startsWith(EnrichPolicy.ENRICH_INDEX_NAME_BASE) == false) {
validationException = ValidateActions.addValidationError("index [" + index + "] is not an enrich index",
validationException);
validationException = ValidateActions.addValidationError(
"index [" + index + "] is not an enrich index",
validationException
);
}
return validationException;
}
@ -147,8 +150,9 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
copy.from(0);
copy.size(10);
copy.fetchSource(null);
assert EMPTY_SOURCE.equals(copy) : "search request [" + Strings.toString(copy) +
"] is using features that is not supported";
assert EMPTY_SOURCE.equals(copy) : "search request ["
+ Strings.toString(copy)
+ "] is using features that is not supported";
}
return true;
}
@ -168,7 +172,8 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
private static final SearchSourceBuilder EMPTY_SOURCE = new SearchSourceBuilder()
// can't set -1 to indicate not specified
.from(0).size(10);
.from(0)
.size(10);
}
public static class TransportAction extends TransportSingleShardAction<Request, MultiSearchResponse> {
@ -176,11 +181,24 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
private final IndicesService indicesService;
@Inject
public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, ThreadPool.Names.SEARCH);
public TransportAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
) {
super(
NAME,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
Request::new,
ThreadPool.Names.SEARCH
);
this.indicesService = indicesService;
}
@ -203,8 +221,8 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
throw new IllegalStateException("index [" + index + "] should have 1 shard, but has " + numShards + " shards");
}
GroupShardsIterator<ShardIterator> result =
clusterService.operationRouting().searchShards(state, new String[] {index}, null, Preference.LOCAL.type());
GroupShardsIterator<ShardIterator> result = clusterService.operationRouting()
.searchShards(state, new String[] { index }, null, Preference.LOCAL.type());
return result.get(0);
}
@ -214,8 +232,12 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
final IndexShard indexShard = indicesService.getShardOrNull(shardId);
try (Engine.Searcher searcher = indexShard.acquireSearcher("enrich_msearch")) {
final FieldsVisitor visitor = new FieldsVisitor(true);
final QueryShardContext context = indexService.newQueryShardContext(shardId.id(),
searcher, () -> {throw new UnsupportedOperationException();}, null);
final QueryShardContext context = indexService.newQueryShardContext(
shardId.id(),
searcher,
() -> { throw new UnsupportedOperationException(); },
null
);
final MapperService mapperService = context.getMapperService();
final Text typeText = mapperService.documentMapper().typeText();
@ -259,10 +281,18 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
Set<String> includes = new HashSet<>(Arrays.asList(fetchSourceContext.includes()));
Set<String> excludes = new HashSet<>(Arrays.asList(fetchSourceContext.excludes()));
XContentBuilder builder =
new XContentBuilder(XContentType.SMILE.xContent(), new BytesStreamOutput(source.length()), includes, excludes);
XContentParser sourceParser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, source, XContentType.SMILE);
XContentBuilder builder = new XContentBuilder(
XContentType.SMILE.xContent(),
new BytesStreamOutput(source.length()),
includes,
excludes
);
XContentParser sourceParser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
XContentType.SMILE
);
builder.copyCurrentStructure(sourceParser);
return BytesReference.bytes(builder);
}
@ -271,7 +301,13 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
SearchHits searchHits = new SearchHits(hits, topDocs.totalHits, 0);
return new SearchResponse(
new InternalSearchResponse(searchHits, null, null, null, false, null, 0),
null, 1, 1, 0, 1L, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY
null,
1,
1,
0,
1L,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
}

View File

@ -45,18 +45,26 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
// where a user creates and deletes a policy before running execute
private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);
@Inject
public TransportDeleteEnrichPolicyAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client,
EnrichPolicyLocks enrichPolicyLocks,
IngestService ingestService) {
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
public TransportDeleteEnrichPolicyAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client,
EnrichPolicyLocks enrichPolicyLocks,
IngestService ingestService
) {
super(
DeleteEnrichPolicyAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
DeleteEnrichPolicyAction.Request::new,
indexNameExpressionResolver
);
this.client = client;
this.enrichPolicyLocks = enrichPolicyLocks;
this.ingestService = ingestService;
@ -77,8 +85,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
}
@Override
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
protected void masterOperation(
DeleteEnrichPolicyAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
@ -90,8 +101,10 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
List<String> pipelinesWithProcessors = new ArrayList<>();
for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors =
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
List<AbstractEnrichProcessor> enrichProcessors = ingestService.getProcessorsInPipeline(
pipelineConfiguration.getId(),
AbstractEnrichProcessor.class
);
for (AbstractEnrichProcessor processor : enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
@ -100,8 +113,12 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
}
if (pipelinesWithProcessors.isEmpty() == false) {
throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
throw new ElasticsearchStatusException(
"Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT,
request.getName(),
pipelinesWithProcessors
);
}
} catch (Exception e) {
enrichPolicyLocks.releasePolicy(request.getName());
@ -109,35 +126,33 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
return;
}
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
(response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onResponse(response);
},
(exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(exc);
}
));
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap((response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onResponse(response);
}, (exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(exc);
}));
}
private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
// delete all enrich indices for this policy
DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
.indices(EnrichPolicy.getBaseName(name) + "-*")
DeleteIndexRequest deleteRequest = new DeleteIndexRequest().indices(EnrichPolicy.getBaseName(name) + "-*")
.indicesOptions(LENIENT_OPTIONS);
client.admin().indices().delete(deleteRequest, ActionListener.wrap(
(response) -> {
if (response.isAcknowledged() == false) {
listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
RestStatus.INTERNAL_SERVER_ERROR, name));
} else {
deletePolicy(name, listener);
}
},
(error) -> listener.onFailure(error)
));
client.admin().indices().delete(deleteRequest, ActionListener.wrap((response) -> {
if (response.isAcknowledged() == false) {
listener.onFailure(
new ElasticsearchStatusException(
"Could not fetch indices to delete during policy delete of [{}]",
RestStatus.INTERNAL_SERVER_ERROR,
name
)
);
} else {
deletePolicy(name, listener);
}
}, (error) -> listener.onFailure(error)));
}
private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {

View File

@ -34,11 +34,23 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
private final Client client;
@Inject
public TransportEnrichStatsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(EnrichStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
EnrichStatsAction.Request::new, indexNameExpressionResolver);
public TransportEnrichStatsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client
) {
super(
EnrichStatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
EnrichStatsAction.Request::new,
indexNameExpressionResolver
);
this.client = client;
}
@ -53,40 +65,42 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
}
@Override
protected void masterOperation(EnrichStatsAction.Request request,
ClusterState state,
ActionListener<EnrichStatsAction.Response> listener) throws Exception {
protected void masterOperation(
EnrichStatsAction.Request request,
ClusterState state,
ActionListener<EnrichStatsAction.Response> listener
) throws Exception {
EnrichCoordinatorStatsAction.Request statsRequest = new EnrichCoordinatorStatsAction.Request();
ActionListener<EnrichCoordinatorStatsAction.Response> statsListener = ActionListener.wrap(
response -> {
if (response.hasFailures()) {
// Report failures even if some node level requests succeed:
Exception failure = null;
for (FailedNodeException nodeFailure : response.failures()) {
if (failure == null) {
failure = nodeFailure;
} else {
failure.addSuppressed(nodeFailure);
}
ActionListener<EnrichCoordinatorStatsAction.Response> statsListener = ActionListener.wrap(response -> {
if (response.hasFailures()) {
// Report failures even if some node level requests succeed:
Exception failure = null;
for (FailedNodeException nodeFailure : response.failures()) {
if (failure == null) {
failure = nodeFailure;
} else {
failure.addSuppressed(nodeFailure);
}
listener.onFailure(failure);
return;
}
listener.onFailure(failure);
return;
}
List<CoordinatorStats> coordinatorStats = response.getNodes().stream()
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
.sorted(Comparator.comparing(CoordinatorStats::getNodeId))
.collect(Collectors.toList());
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
.filter(t -> t.getAction().equals(EnrichPolicyExecutor.TASK_ACTION))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
.map(t -> new ExecutingPolicy(t.getDescription(), t))
.sorted(Comparator.comparing(ExecutingPolicy::getName))
.collect(Collectors.toList());
listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
},
listener::onFailure
);
List<CoordinatorStats> coordinatorStats = response.getNodes()
.stream()
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
.sorted(Comparator.comparing(CoordinatorStats::getNodeId))
.collect(Collectors.toList());
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks()
.values()
.stream()
.filter(t -> t.getAction().equals(EnrichPolicyExecutor.TASK_ACTION))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
.map(t -> new ExecutingPolicy(t.getDescription(), t))
.sorted(Comparator.comparing(ExecutingPolicy::getName))
.collect(Collectors.toList());
listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
}, listener::onFailure);
client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
}

View File

@ -29,24 +29,41 @@ import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import java.io.IOException;
public class TransportExecuteEnrichPolicyAction
extends TransportMasterNodeAction<ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> {
public class TransportExecuteEnrichPolicyAction extends
TransportMasterNodeAction<ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> {
private final EnrichPolicyExecutor executor;
@Inject
public TransportExecuteEnrichPolicyAction(Settings settings,
Client client,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks enrichPolicyLocks) {
super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
this.executor = new EnrichPolicyExecutor(settings, clusterService, client, transportService.getTaskManager(), threadPool,
new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis);
public TransportExecuteEnrichPolicyAction(
Settings settings,
Client client,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks enrichPolicyLocks
) {
super(
ExecuteEnrichPolicyAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
ExecuteEnrichPolicyAction.Request::new,
indexNameExpressionResolver
);
this.executor = new EnrichPolicyExecutor(
settings,
clusterService,
client,
transportService.getTaskManager(),
threadPool,
new IndexNameExpressionResolver(),
enrichPolicyLocks,
System::currentTimeMillis
);
}
@Override
@ -60,8 +77,11 @@ public class TransportExecuteEnrichPolicyAction
}
@Override
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
protected void masterOperation(
ExecuteEnrichPolicyAction.Request request,
ClusterState state,
ActionListener<ExecuteEnrichPolicyAction.Response> listener
) {
if (state.getNodes().getIngestNodes().isEmpty()) {
// if we don't fail here then reindex will fail with a more complicated error.
// (EnrichPolicyRunner uses a pipeline with reindex)

View File

@ -25,17 +25,26 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadAction<GetEnrichPolicyAction.Request,
GetEnrichPolicyAction.Response> {
public class TransportGetEnrichPolicyAction extends
TransportMasterNodeReadAction<GetEnrichPolicyAction.Request, GetEnrichPolicyAction.Response> {
@Inject
public TransportGetEnrichPolicyAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(GetEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
GetEnrichPolicyAction.Request::new, indexNameExpressionResolver);
public TransportGetEnrichPolicyAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetEnrichPolicyAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetEnrichPolicyAction.Request::new,
indexNameExpressionResolver
);
}
@Override
@ -53,15 +62,17 @@ public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadActio
}
@Override
protected void masterOperation(GetEnrichPolicyAction.Request request,
ClusterState state,
ActionListener<GetEnrichPolicyAction.Response> listener) throws Exception {
protected void masterOperation(
GetEnrichPolicyAction.Request request,
ClusterState state,
ActionListener<GetEnrichPolicyAction.Response> listener
) throws Exception {
Map<String, EnrichPolicy> policies;
if (request.getNames() == null || request.getNames().isEmpty()) {
policies = EnrichStore.getPolicies(state);
} else {
policies = new HashMap<>();
for (String name: request.getNames()) {
for (String name : request.getNames()) {
if (name.isEmpty() == false) {
EnrichPolicy policy = EnrichStore.getPolicy(name, state);
if (policy != null) {

View File

@ -41,15 +41,29 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<Pu
private final Client client;
@Inject
public TransportPutEnrichPolicyAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, Client client,
XPackLicenseState licenseState, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(PutEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutEnrichPolicyAction.Request::new, indexNameExpressionResolver);
public TransportPutEnrichPolicyAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
Client client,
XPackLicenseState licenseState,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
PutEnrichPolicyAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
PutEnrichPolicyAction.Request::new,
indexNameExpressionResolver
);
this.licenseState = licenseState;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
this.client = client;
}
@ -68,8 +82,11 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<Pu
}
@Override
protected void masterOperation(PutEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
protected void masterOperation(
PutEnrichPolicyAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {
if (licenseState.isAuthAllowed()) {
RoleDescriptor.IndicesPrivileges privileges = RoleDescriptor.IndicesPrivileges.builder()
@ -85,23 +102,26 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<Pu
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
privRequest.indexPrivileges(privileges);
ActionListener<HasPrivilegesResponse> wrappedListener = ActionListener.wrap(
r -> {
if (r.isCompleteMatch()) {
putPolicy(request, listener);
} else {
listener.onFailure(Exceptions.authorizationError("unable to store policy because no indices match with the " +
"specified index patterns {}", request.getPolicy().getIndices(), username));
}
},
listener::onFailure);
ActionListener<HasPrivilegesResponse> wrappedListener = ActionListener.wrap(r -> {
if (r.isCompleteMatch()) {
putPolicy(request, listener);
} else {
listener.onFailure(
Exceptions.authorizationError(
"unable to store policy because no indices match with the " + "specified index patterns {}",
request.getPolicy().getIndices(),
username
)
);
}
}, listener::onFailure);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, wrappedListener);
} else {
putPolicy(request, listener);
}
}
private void putPolicy(PutEnrichPolicyAction.Request request, ActionListener<AcknowledgedResponse> listener ) {
private void putPolicy(PutEnrichPolicyAction.Request request, ActionListener<AcknowledgedResponse> listener) {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, indexNameExpressionResolver, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));

View File

@ -26,8 +26,8 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
return Collections.singletonList(LocalStateEnrich.class);
}
protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy, ClusterService clusterService)
throws InterruptedException {
if (policy != null) {
createSourceIndices(policy);
}
@ -50,7 +50,7 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
latch.countDown();
});
latch.await();
if (error.get() != null){
if (error.get() != null) {
throw error.get();
}
}

View File

@ -67,15 +67,25 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
List<String> keys = createSourceMatchIndex(numDocs, maxMatches);
String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Arrays.asList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"users\", \"max_matches\": " + maxMatches + "}}]}";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \""
+ MATCH_FIELD
+ "\", \"target_field\": \"users\", \"max_matches\": "
+ maxMatches
+ "}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
@ -111,8 +121,8 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
}
}
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
@ -126,27 +136,41 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
// create enrich index
{
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.source(mapOf(matchField, "POLYGON((" +
"-122.08592534065245 37.38501746624134," +
"-122.08193421363829 37.38501746624134," +
"-122.08193421363829 37.3879329075567," +
"-122.08592534065245 37.3879329075567," +
"-122.08592534065245 37.38501746624134))",
"zipcode", "94040"));
indexRequest.source(
mapOf(
matchField,
"POLYGON(("
+ "-122.08592534065245 37.38501746624134,"
+ "-122.08193421363829 37.38501746624134,"
+ "-122.08193421363829 37.3879329075567,"
+ "-122.08592534065245 37.3879329075567,"
+ "-122.08592534065245 37.38501746624134))",
"zipcode",
"94040"
)
);
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
}
String policyName = "my-policy";
EnrichPolicy enrichPolicy =
new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, Arrays.asList(SOURCE_INDEX_NAME), matchField, Arrays.asList(enrichField));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.GEO_MATCH_TYPE,
null,
Arrays.asList(SOURCE_INDEX_NAME),
matchField,
Arrays.asList(enrichField)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"" + matchField + "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \""
+ matchField
+ "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
@ -169,8 +193,8 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
assertThat(entries.containsKey(matchField), is(true));
assertThat(entries.get(enrichField), equalTo("94040"));
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
@ -188,15 +212,21 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source-" + i), "key", Collections.singletonList("value"));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("source-" + i),
"key",
Collections.singletonList("value")
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "pipeline" + i;
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
}
@ -231,26 +261,35 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
}
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexName), "key",
Collections.singletonList("value"));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(sourceIndexName),
"key",
Collections.singletonList("value")
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
ExecuteEnrichPolicyAction.Response executeResponse = client()
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
.actionGet();
ExecuteEnrichPolicyAction.Response executeResponse = client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false)
).actionGet();
assertThat(executeResponse.getStatus(), is(nullValue()));
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
assertBusy(() -> {
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
assertThat(
((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE)
);
});
String pipelineName = "test-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
@ -284,8 +323,9 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
for (int doc = 0; doc < numDocsPerKey; doc++) {
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
indexRequest.source(
mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0", DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2")
);
client().index(indexRequest).actionGet();
}
}

View File

@ -34,9 +34,7 @@ public class EnrichDisabledIT extends ESSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), false)
.build();
return Settings.builder().put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), false).build();
}
@Override

View File

@ -56,7 +56,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
private static final String PIPELINE_NAME = "my-pipeline";
static final String SOURCE_INDEX_NAME = "users";
static final String MATCH_FIELD = "email";
static final String[] DECORATE_FIELDS = new String[]{"address", "city", "country"};
static final String[] DECORATE_FIELDS = new String[] { "address", "city", "country" };
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
@ -81,15 +81,21 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Arrays.asList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
EnrichPolicy.NamedPolicy result =
client().execute(GetEnrichPolicyAction.INSTANCE,
new GetEnrichPolicyAction.Request(new String[]{policyName})).actionGet().getPolicies().get(0);
EnrichPolicy.NamedPolicy result = client().execute(
GetEnrichPolicyAction.INSTANCE,
new GetEnrichPolicyAction.Request(new String[] { policyName })
).actionGet().getPolicies().get(0);
assertThat(result, equalTo(new EnrichPolicy.NamedPolicy(policyName, enrichPolicy)));
String enrichIndexPrefix = EnrichPolicy.getBaseName(policyName) + "*";
refresh(enrichIndexPrefix);
@ -98,8 +104,8 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numDocsInSourceIndex));
}
GetEnrichPolicyAction.Response response =
client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request()).actionGet();
GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
.actionGet();
assertThat(response.getPolicies().size(), equalTo(numPolicies));
for (int i = 0; i < numPolicies; i++) {
@ -175,14 +181,11 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
}
}
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream()
.filter(s -> s.getNodeId().equals(nodeId))
.findAny()
.get();
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.getNodeId().equals(nodeId)).findAny().get();
assertThat(stats.getNodeId(), equalTo(nodeId));
assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(stats.getExecutedSearchesTotal(), equalTo((long) numDocs));
@ -199,8 +202,18 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.create(true);
indexRequest.id(key);
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4),
DECORATE_FIELDS[1], randomAlphaOfLength(4), DECORATE_FIELDS[2], randomAlphaOfLength(4)));
indexRequest.source(
mapOf(
MATCH_FIELD,
key,
DECORATE_FIELDS[0],
randomAlphaOfLength(4),
DECORATE_FIELDS[1],
randomAlphaOfLength(4),
DECORATE_FIELDS[2],
randomAlphaOfLength(4)
)
);
client().index(indexRequest).actionGet();
}
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
@ -208,16 +221,24 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
}
private static void createAndExecutePolicy() {
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Arrays.asList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
}
private static void createPipeline() {
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME +
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ POLICY_NAME
+ "\", \"field\": \""
+ MATCH_FIELD
+ "\", \"target_field\": \"user\"}}]}";
PutPipelineRequest request = new PutPipelineRequest(PIPELINE_NAME, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(request).actionGet();
}

View File

@ -36,10 +36,10 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
private static TaskManager testTaskManager;
private static final ActionListener<ExecuteEnrichPolicyStatus> noOpListener = new ActionListener<ExecuteEnrichPolicyStatus>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus ignored) { }
public void onResponse(ExecuteEnrichPolicyStatus ignored) {}
@Override
public void onFailure(Exception e) { }
public void onFailure(Exception e) {}
};
@BeforeClass
@ -61,8 +61,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
private final ExecuteEnrichPolicyTask task;
private final ActionListener<ExecuteEnrichPolicyStatus> listener;
BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, ActionListener<ExecuteEnrichPolicyStatus> listener) {
this.latch = latch;
this.task = task;
this.listener = listener;
@ -88,14 +87,29 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
*/
private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor {
EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, TaskManager taskManager,
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
super(settings, clusterService, client, taskManager, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(),
nowSupplier);
EnrichPolicyTestExecutor(
Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier
) {
super(
settings,
clusterService,
client,
taskManager,
threadPool,
indexNameExpressionResolver,
new EnrichPolicyLocks(),
nowSupplier
);
}
private CountDownLatch currentLatch;
CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener<ExecuteEnrichPolicyStatus> listener) {
currentLatch = new CountDownLatch(1);
ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName);
@ -104,8 +118,12 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
}
@Override
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
protected Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
if (currentLatch == null) {
throw new IllegalStateException("Use the testRunPolicy method on this test instance");
}
@ -115,39 +133,62 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
public void testNonConcurrentPolicyExecution() throws InterruptedException {
String testPolicyName = "test_policy";
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testTaskManager,
testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
EnrichPolicy testPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("some_index"),
"keyfield",
Collections.singletonList("valuefield")
);
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(
Settings.EMPTY,
null,
null,
testTaskManager,
testThreadPool,
new IndexNameExpressionResolver(),
ESTestCase::randomNonNegativeLong
);
// Launch a fake policy run that will block until firstTaskBlock is counted down.
final CountDownLatch firstTaskComplete = new CountDownLatch(1);
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy,
new LatchedActionListener<>(noOpListener, firstTaskComplete));
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(
testPolicyName,
testPolicy,
new LatchedActionListener<>(noOpListener, firstTaskComplete)
);
// Launch a second fake run that should fail immediately because the lock is obtained.
EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class,
"Expected exception but nothing was thrown", () -> {
CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener);
// Should throw exception on the previous statement, but if it doesn't, be a
// good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
countDownLatch.countDown();
firstTaskBlock.countDown();
firstTaskComplete.await();
});
EsRejectedExecutionException expected = expectThrows(
EsRejectedExecutionException.class,
"Expected exception but nothing was thrown",
() -> {
CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener);
// Should throw exception on the previous statement, but if it doesn't, be a
// good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
countDownLatch.countDown();
firstTaskBlock.countDown();
firstTaskComplete.await();
}
);
// Conclude the first mock run
firstTaskBlock.countDown();
firstTaskComplete.await();
// Validate exception from second run
assertThat(expected.getMessage(), containsString("Could not obtain lock because policy execution for [" + testPolicyName +
"] is already in progress."));
assertThat(
expected.getMessage(),
containsString("Could not obtain lock because policy execution for [" + testPolicyName + "] is already in progress.")
);
// Ensure that the lock from the previous run has been cleared
CountDownLatch secondTaskComplete = new CountDownLatch(1);
CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy,
new LatchedActionListener<>(noOpListener, secondTaskComplete));
CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(
testPolicyName,
testPolicy,
new LatchedActionListener<>(noOpListener, secondTaskComplete)
);
secondTaskBlock.countDown();
secondTaskComplete.await();
}
@ -155,23 +196,43 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
public void testMaximumPolicyExecutionLimit() throws InterruptedException {
String testPolicyBaseName = "test_policy_";
Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build();
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testTaskManager,
testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
EnrichPolicy testPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("some_index"),
"keyfield",
Collections.singletonList("valuefield")
);
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(
testSettings,
null,
null,
testTaskManager,
testThreadPool,
new IndexNameExpressionResolver(),
ESTestCase::randomNonNegativeLong
);
// Launch a two fake policy runs that will block until counted down to use up the maximum concurrent
final CountDownLatch firstTaskComplete = new CountDownLatch(1);
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy,
new LatchedActionListener<>(noOpListener, firstTaskComplete));
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(
testPolicyBaseName + "1",
testPolicy,
new LatchedActionListener<>(noOpListener, firstTaskComplete)
);
final CountDownLatch secondTaskComplete = new CountDownLatch(1);
final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "2", testPolicy,
new LatchedActionListener<>(noOpListener, secondTaskComplete));
final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(
testPolicyBaseName + "2",
testPolicy,
new LatchedActionListener<>(noOpListener, secondTaskComplete)
);
// Launch a third fake run that should fail immediately because the lock is obtained.
EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class,
"Expected exception but nothing was thrown", () -> {
EsRejectedExecutionException expected = expectThrows(
EsRejectedExecutionException.class,
"Expected exception but nothing was thrown",
() -> {
CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener);
// Should throw exception on the previous statement, but if it doesn't, be a
// good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
@ -180,7 +241,8 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
secondTaskBlock.countDown();
firstTaskComplete.await();
secondTaskComplete.await();
});
}
);
// Conclude the first mock run
firstTaskBlock.countDown();
@ -189,13 +251,20 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
secondTaskComplete.await();
// Validate exception from second run
assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [test_policy_3] would exceed " +
"maximum concurrent policy executions [2]"));
assertThat(
expected.getMessage(),
containsString(
"Policy execution failed. Policy execution for [test_policy_3] would exceed " + "maximum concurrent policy executions [2]"
)
);
// Ensure that the lock from the previous run has been cleared
CountDownLatch finalTaskComplete = new CountDownLatch(1);
CountDownLatch finalTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy,
new LatchedActionListener<>(noOpListener, finalTaskComplete));
CountDownLatch finalTaskBlock = testExecutor.testRunPolicy(
testPolicyBaseName + "1",
testPolicy,
new LatchedActionListener<>(noOpListener, finalTaskComplete)
);
finalTaskBlock.countDown();
finalTaskComplete.await();
}

View File

@ -22,17 +22,19 @@ public class EnrichPolicyLocksTests extends ESTestCase {
policyLocks.lockPolicy(policy1);
// Ensure that locked policies are rejected
EsRejectedExecutionException exception1 = expectThrows(EsRejectedExecutionException.class,
() -> policyLocks.lockPolicy(policy1));
assertThat(exception1.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy1]" +
" is already in progress.")));
EsRejectedExecutionException exception1 = expectThrows(EsRejectedExecutionException.class, () -> policyLocks.lockPolicy(policy1));
assertThat(
exception1.getMessage(),
is(equalTo("Could not obtain lock because policy execution for [policy1]" + " is already in progress."))
);
policyLocks.lockPolicy(policy2);
EsRejectedExecutionException exception2 = expectThrows(EsRejectedExecutionException.class,
() -> policyLocks.lockPolicy(policy2));
EsRejectedExecutionException exception2 = expectThrows(EsRejectedExecutionException.class, () -> policyLocks.lockPolicy(policy2));
assertThat(exception2.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy2]" +
" is already in progress.")));
assertThat(
exception2.getMessage(),
is(equalTo("Could not obtain lock because policy execution for [policy2]" + " is already in progress."))
);
}
public void testSafePoint() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.plugins.Plugin;
@ -118,20 +119,19 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
enrichKeys.add(randomAlphaOfLength(10));
}
String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(sourceIndex), randomAlphaOfLength(10),
enrichKeys);
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(sourceIndex), randomAlphaOfLength(10), enrichKeys);
}
private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
createSourceIndices(client(), policy);
doSyncronously((clusterService, exceptionConsumer) ->
EnrichStore.putPolicy(policyName, policy, clusterService, resolver, exceptionConsumer));
doSyncronously(
(clusterService, exceptionConsumer) -> EnrichStore.putPolicy(policyName, policy, clusterService, resolver, exceptionConsumer)
);
}
private void removePolicy(String policyName) throws InterruptedException {
doSyncronously((clusterService, exceptionConsumer) ->
EnrichStore.deletePolicy(policyName, clusterService, exceptionConsumer));
doSyncronously((clusterService, exceptionConsumer) -> EnrichStore.deletePolicy(policyName, clusterService, exceptionConsumer));
}
private void doSyncronously(BiConsumer<ClusterService, Consumer<Exception>> function) throws InterruptedException {
@ -151,18 +151,20 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
}
private String fakeRunPolicy(String forPolicy) throws IOException {
XContentBuilder source = JsonXContent.contentBuilder();
source.startObject();
{
source.startObject(MapperService.SINGLE_MAPPING_NAME);
{
source.startObject("_meta");
source.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy);
source.endObject();
}
source.endObject();
}
source.endObject();
String newIndexName = EnrichPolicy.getBaseName(forPolicy) + "-" + indexNameAutoIncrementingCounter++;
CreateIndexRequest request = new CreateIndexRequest(newIndexName)
.mapping(
MapperService.SINGLE_MAPPING_NAME, JsonXContent.contentBuilder()
.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.startObject("_meta")
.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy)
.endObject()
.endObject()
.endObject()
);
CreateIndexRequest request = new CreateIndexRequest(newIndexName).mapping(MapperService.SINGLE_MAPPING_NAME, source);
client().admin().indices().create(request).actionGet();
promoteFakePolicyIndex(newIndexName, forPolicy);
return newIndexName;

View File

@ -40,13 +40,24 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
IngestService ingestService = getInstanceFromNode(IngestService.class);
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index"),
"key1", Collections.singletonList("field1"));
EnrichPolicy instance1 = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("index"),
"key1",
Collections.singletonList("field1")
);
createSourceIndices(client(), instance1);
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat("Execute failed", client().execute(ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet().getStatus().isCompleted(), equalTo(true));
assertThat(
"Execute failed",
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy"))
.actionGet()
.getStatus()
.isCompleted(),
equalTo(true)
);
String pipelineConfig =
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
@ -55,11 +66,18 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
Pipeline pipelineInstance1 = ingestService.getPipeline("1");
assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class));
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index2"),
"key2", Collections.singletonList("field2"));
EnrichPolicy instance2 = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("index2"),
"key2",
Collections.singletonList("field2")
);
createSourceIndices(client(), instance2);
ResourceAlreadyExistsException exc = expectThrows(ResourceAlreadyExistsException.class, () ->
client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)).actionGet());
ResourceAlreadyExistsException exc = expectThrows(
ResourceAlreadyExistsException.class,
() -> client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)).actionGet()
);
assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));
}
}

View File

@ -31,8 +31,13 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testCreateProcessorInstance() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichPolicy policy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("source_index"),
"my_key",
enrichValues
);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -145,8 +150,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testUnsupportedPolicy() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy =
new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
EnrichPolicy policy = new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -165,8 +169,13 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testCompactEnrichValuesFormat() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichPolicy policy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("source_index"),
"host",
enrichValues
);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -184,8 +193,13 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testNoTargetField() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichPolicy policy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList("source_index"),
"host",
enrichValues
);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -199,8 +213,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testIllegalMaxMatches() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("source_index"), "my_key",
enrichValues);
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -222,8 +235,14 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
.build();
IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1");
builder.settings(settings);
builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() +
"\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}");
builder.putMapping(
"_doc",
"{\"_meta\": {\"enrich_match_field\": \""
+ policy.getMatchField()
+ "\", \"enrich_policy_type\": \""
+ policy.getType()
+ "\"}}"
);
builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build());
return MetaData.builder().put(builder).build();
}

View File

@ -49,8 +49,13 @@ public class EnrichRestartIT extends ESIntegTestCase {
final int numPolicies = randomIntBetween(2, 4);
internalCluster().startNode();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
createSourceIndices(client(), enrichPolicy);
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
@ -65,12 +70,13 @@ public class EnrichRestartIT extends ESIntegTestCase {
}
private static void verifyPolicies(int numPolicies, EnrichPolicy enrichPolicy) {
GetEnrichPolicyAction.Response response =
client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request()).actionGet();
GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
.actionGet();
assertThat(response.getPolicies().size(), equalTo(numPolicies));
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
Optional<EnrichPolicy.NamedPolicy> result = response.getPolicies().stream()
Optional<EnrichPolicy.NamedPolicy> result = response.getPolicies()
.stream()
.filter(namedPolicy -> namedPolicy.getName().equals(policyName))
.findFirst();
assertThat(result.isPresent(), is(true));

View File

@ -50,7 +50,8 @@ public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
assertThat(error.get(), nullValue());
error = saveEnrichPolicy(name, policy, clusterService);
assertTrue(error.get().getMessage().contains("policy [my-policy] already exists"));;
assertTrue(error.get().getMessage().contains("policy [my-policy] already exists"));
;
deleteEnrichPolicy(name, clusterService);
EnrichPolicy result = EnrichStore.getPolicy(name, clusterService.state());
@ -64,37 +65,54 @@ public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
{
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> saveEnrichPolicy(nullOrEmptyName, policy, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy(nullOrEmptyName, policy, clusterService)
);
assertThat(error.getMessage(), equalTo("name is missing or empty"));
}
{
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> saveEnrichPolicy("my-policy", null, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy("my-policy", null, clusterService)
);
assertThat(error.getMessage(), equalTo("policy is missing"));
}
{
IllegalArgumentException error =
expectThrows(IllegalArgumentException.class, () -> saveEnrichPolicy("my#policy", policy, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy("my#policy", policy, clusterService)
);
assertThat(error.getMessage(), equalTo("Invalid policy name [my#policy], must not contain '#'"));
}
{
IllegalArgumentException error =
expectThrows(IllegalArgumentException.class, () -> saveEnrichPolicy("..", policy, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy("..", policy, clusterService)
);
assertThat(error.getMessage(), equalTo("Invalid policy name [..], must not be '.' or '..'"));
}
{
IllegalArgumentException error =
expectThrows(IllegalArgumentException.class, () -> saveEnrichPolicy("myPolicy", policy, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy("myPolicy", policy, clusterService)
);
assertThat(error.getMessage(), equalTo("Invalid policy name [myPolicy], must be lowercase"));
}
{
EnrichPolicy invalidPolicy = new EnrichPolicy("unsupported_type", null, Collections.singletonList("index"),
"field", Collections.singletonList("field"));
IllegalArgumentException error =
expectThrows(IllegalArgumentException.class, () -> saveEnrichPolicy("name", invalidPolicy, clusterService));
EnrichPolicy invalidPolicy = new EnrichPolicy(
"unsupported_type",
null,
Collections.singletonList("index"),
"field",
Collections.singletonList("field")
);
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> saveEnrichPolicy("name", invalidPolicy, clusterService)
);
assertThat(error.getMessage(), equalTo("unsupported policy type [unsupported_type], supported types are [match, geo_match]"));
}
}
@ -105,14 +123,18 @@ public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
{
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> deleteEnrichPolicy(nullOrEmptyName, clusterService));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> deleteEnrichPolicy(nullOrEmptyName, clusterService)
);
assertThat(error.getMessage(), equalTo("name is missing or empty"));
}
{
ResourceNotFoundException error = expectThrows(ResourceNotFoundException.class,
() -> deleteEnrichPolicy("my-policy", clusterService));
ResourceNotFoundException error = expectThrows(
ResourceNotFoundException.class,
() -> deleteEnrichPolicy("my-policy", clusterService)
);
assertThat(error.getMessage(), equalTo("policy [my-policy] not found"));
}
@ -122,8 +144,10 @@ public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> EnrichStore.getPolicy(nullOrEmptyName, clusterService.state()));
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> EnrichStore.getPolicy(nullOrEmptyName, clusterService.state())
);
assertThat(error.getMessage(), equalTo("name is missing or empty"));
@ -131,7 +155,7 @@ public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
assertNull(policy);
}
public void testListValidation() {
public void testListValidation() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
assertTrue(policies.isEmpty());

View File

@ -55,20 +55,38 @@ public class GeoMatchProcessorTests extends ESTestCase {
testBasicsForFieldValue("37.386637, -122.084110", expectedPoint);
testBasicsForFieldValue("POINT (-122.084110 37.386637)", expectedPoint);
testBasicsForFieldValue(Arrays.asList(-122.084110, 37.386637), expectedPoint);
testBasicsForFieldValue(Arrays.asList(Arrays.asList(-122.084110, 37.386637),
"37.386637, -122.084110", "POINT (-122.084110 37.386637)"),
new MultiPoint(Arrays.asList(expectedPoint, expectedPoint, expectedPoint)));
testBasicsForFieldValue(
Arrays.asList(Arrays.asList(-122.084110, 37.386637), "37.386637, -122.084110", "POINT (-122.084110 37.386637)"),
new MultiPoint(Arrays.asList(expectedPoint, expectedPoint, expectedPoint))
);
testBasicsForFieldValue("not a point", null);
}
private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometry) {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("key", mapOf("shape", "object", "zipcode",94040)));
GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry",
false, false, "shape", maxMatches, ShapeRelation.INTERSECTS);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
mapOf("location", fieldValue));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("key", mapOf("shape", "object", "zipcode", 94040)));
GeoMatchProcessor processor = new GeoMatchProcessor(
"_tag",
mockSearch,
"_name",
"location",
"entry",
false,
false,
"shape",
maxMatches,
ShapeRelation.INTERSECTS
);
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
mapOf("location", fieldValue)
);
// Run
IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
@ -107,7 +125,7 @@ public class GeoMatchProcessorTests extends ESTestCase {
}
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest;
private final Exception exception;
@ -153,8 +171,12 @@ public class GeoMatchProcessorTests extends ESTestCase {
public SearchResponse mockResponse(Map<String, Map<String, ?>> documents) {
SearchHit[] searchHits = documents.entrySet().stream().map(e -> {
SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap());
SearchHit searchHit = new SearchHit(
randomInt(100),
e.getKey(),
new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap()
);
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
builder.map(e.getValue());
builder.flush();
@ -165,9 +187,23 @@ public class GeoMatchProcessorTests extends ESTestCase {
}
return searchHit;
}).toArray(SearchHit[]::new);
return new SearchResponse(new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
return new SearchResponse(
new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
false,
false,
null,
1
),
null,
1,
1,
0,
1,
ShardSearchFailure.EMPTY_ARRAY,
new SearchResponse.Clusters(1, 1, 0)
);
}
}

View File

@ -49,10 +49,17 @@ public class MatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", maxMatches);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co")
);
// Run
IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
@ -89,8 +96,15 @@ public class MatchProcessorTests extends ESTestCase {
public void testNoMatch() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction();
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com")
);
int numProperties = ingestDocument.getSourceAndMetadata().size();
// Run
IngestDocument[] holder = new IngestDocument[1];
@ -119,8 +133,15 @@ public class MatchProcessorTests extends ESTestCase {
String indexName = ".enrich-_name";
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com")
);
// Run
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
@ -152,8 +173,17 @@ public class MatchProcessorTests extends ESTestCase {
public void testIgnoreKeyMissing() throws Exception {
{
MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, true, "domain", 1);
MatchProcessor processor = new MatchProcessor(
"_tag",
mockedSearchFunction(),
"_name",
"domain",
"entry",
true,
true,
"domain",
1
);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
@ -163,8 +193,17 @@ public class MatchProcessorTests extends ESTestCase {
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
}
{
MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, false, "domain", 1);
MatchProcessor processor = new MatchProcessor(
"_tag",
mockedSearchFunction(),
"_name",
"domain",
"entry",
true,
false,
"domain",
1
);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
@ -179,7 +218,7 @@ public class MatchProcessorTests extends ESTestCase {
}
public void testExistingFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
@ -195,7 +234,7 @@ public class MatchProcessorTests extends ESTestCase {
}
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1);
Map<String, Object> source = new HashMap<>();
@ -215,10 +254,16 @@ public class MatchProcessorTests extends ESTestCase {
public void testNumericValue() {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf(2, mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor =
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf("domain", 2));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
mapOf("domain", 2)
);
// Execute
IngestDocument[] holder = new IngestDocument[1];
@ -242,12 +287,19 @@ public class MatchProcessorTests extends ESTestCase {
}
public void testArray() {
MockSearchFunction mockSearch =
mockedSearchFunction(mapOf(Arrays.asList("1", "2"), mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor =
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf("domain", Arrays.asList("1", "2")));
MockSearchFunction mockSearch = mockedSearchFunction(
mapOf(Arrays.asList("1", "2"), mapOf("globalRank", 451, "tldRank", 23, "tld", "co"))
);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_type",
"_id",
"_routing",
1L,
VersionType.INTERNAL,
mapOf("domain", Arrays.asList("1", "2"))
);
// Execute
IngestDocument[] holder = new IngestDocument[1];
@ -272,7 +324,7 @@ public class MatchProcessorTests extends ESTestCase {
assertThat(entry.get("tld"), equalTo("co"));
}
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest;
private final Exception exception;
@ -318,8 +370,12 @@ public class MatchProcessorTests extends ESTestCase {
public SearchResponse mockResponse(Map<?, Map<String, ?>> documents) {
SearchHit[] searchHits = documents.entrySet().stream().map(e -> {
SearchHit searchHit = new SearchHit(randomInt(100), e.getKey().toString(), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap());
SearchHit searchHit = new SearchHit(
randomInt(100),
e.getKey().toString(),
new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap()
);
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
builder.map(e.getValue());
builder.flush();
@ -330,23 +386,37 @@ public class MatchProcessorTests extends ESTestCase {
}
return searchHit;
}).toArray(SearchHit[]::new);
return new SearchResponse(new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
return new SearchResponse(
new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
false,
false,
null,
1
),
null,
1,
1,
0,
1,
ShardSearchFailure.EMPTY_ARRAY,
new SearchResponse.Clusters(1, 1, 0)
);
}
static <K, V> Map<K, V> mapOf() {
static <K, V> Map<K, V> mapOf() {
return Collections.emptyMap();
}
static <K, V> Map<K, V> mapOf(K key1, V value1) {
static <K, V> Map<K, V> mapOf(K key1, V value1) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
return map;
}
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2) {
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
@ -361,7 +431,7 @@ public class MatchProcessorTests extends ESTestCase {
return map;
}
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);

View File

@ -208,12 +208,13 @@ public class CoordinatorTests extends ESTestCase {
});
coordinator.coordinateLookups();
assertBusy(() -> {
assertThat(completed.get(), is(true));
});
assertBusy(() -> { assertThat(completed.get(), is(true)); });
lookupFunction.capturedConsumers.get(0).accept(
new MultiSearchResponse(new MultiSearchResponse.Item[]{new MultiSearchResponse.Item(emptySearchResponse(), null)}, 1L), null);
lookupFunction.capturedConsumers.get(0)
.accept(
new MultiSearchResponse(new MultiSearchResponse.Item[] { new MultiSearchResponse.Item(emptySearchResponse(), null) }, 1L),
null
);
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest));
@ -232,14 +233,18 @@ public class CoordinatorTests extends ESTestCase {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
ActionType<Response> action, Request request) {
ActionType<Response> action,
Request request
) {
throw new UnsupportedOperationException();
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,
Request request,
ActionListener<Response> listener) {
public <Request extends ActionRequest, Response extends ActionResponse> void execute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
requests.add((EnrichShardMultiSearchAction.Request) request);
}
@ -268,7 +273,7 @@ public class CoordinatorTests extends ESTestCase {
MultiSearchResponse.Item item1 = new MultiSearchResponse.Item(emptySearchResponse(), null);
itemsPerIndex.put("index1", Arrays.asList(new Tuple<>(0, null), new Tuple<>(1, null), new Tuple<>(2, null)));
shardResponses.put("index1", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item1, item1, item1}, 1), null));
shardResponses.put("index1", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[] { item1, item1, item1 }, 1), null));
Exception failure = new RuntimeException();
itemsPerIndex.put("index2", Arrays.asList(new Tuple<>(3, null), new Tuple<>(4, null), new Tuple<>(5, null)));
@ -276,7 +281,7 @@ public class CoordinatorTests extends ESTestCase {
MultiSearchResponse.Item item2 = new MultiSearchResponse.Item(emptySearchResponse(), null);
itemsPerIndex.put("index3", Arrays.asList(new Tuple<>(6, null), new Tuple<>(7, null), new Tuple<>(8, null)));
shardResponses.put("index3", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item2, item2, item2}, 1), null));
shardResponses.put("index3", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[] { item2, item2, item2 }, 1), null));
MultiSearchResponse result = Coordinator.reduce(9, itemsPerIndex, shardResponses);
assertThat(result.getResponses().length, equalTo(9));
@ -292,8 +297,15 @@ public class CoordinatorTests extends ESTestCase {
}
private static SearchResponse emptySearchResponse() {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
InternalSearchResponse response = new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN),
InternalAggregations.EMPTY,
null,
null,
false,
null,
1
);
return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
}

View File

@ -57,8 +57,10 @@ public class EnrichShardMultiSearchActionTests extends ESSingleNodeTestCase {
request.add(searchRequest);
}
MultiSearchResponse result =
client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet();
MultiSearchResponse result = client().execute(
EnrichShardMultiSearchAction.INSTANCE,
new EnrichShardMultiSearchAction.Request(request)
).actionGet();
assertThat(result.getResponses().length, equalTo(numSearches));
for (int i = 0; i < numSearches; i++) {
assertThat(result.getResponses()[i].isFailure(), is(false));
@ -72,8 +74,10 @@ public class EnrichShardMultiSearchActionTests extends ESSingleNodeTestCase {
createIndex("index");
MultiSearchRequest request = new MultiSearchRequest();
request.add(new SearchRequest("index"));
Exception e = expectThrows(ActionRequestValidationException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet());
Exception e = expectThrows(
ActionRequestValidationException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet()
);
assertThat(e.getMessage(), equalTo("Validation Failed: 1: index [index] is not an enrich index;"));
}
@ -82,8 +86,10 @@ public class EnrichShardMultiSearchActionTests extends ESSingleNodeTestCase {
createIndex(indexName, Settings.builder().put("index.number_of_shards", 2).build());
MultiSearchRequest request = new MultiSearchRequest();
request.add(new SearchRequest(indexName));
Exception e = expectThrows(IllegalStateException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet());
Exception e = expectThrows(
IllegalStateException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet()
);
assertThat(e.getMessage(), equalTo("index [.enrich-1] should have 1 shard, but has 2 shards"));
}

View File

@ -31,8 +31,13 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
int numCoordinatingStats = randomIntBetween(0, 16);
List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
CoordinatorStats stats = new CoordinatorStats(randomAlphaOfLength(4), randomIntBetween(0, 8096),
randomIntBetween(0, 8096), randomNonNegativeLong(), randomNonNegativeLong());
CoordinatorStats stats = new CoordinatorStats(
randomAlphaOfLength(4),
randomIntBetween(0, 8096),
randomIntBetween(0, 8096),
randomNonNegativeLong(),
randomNonNegativeLong()
);
coordinatorStats.add(stats);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
@ -52,9 +57,9 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
}
}

View File

@ -54,19 +54,17 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(fakeId),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
transportAction.execute(null, new DeleteEnrichPolicyAction.Request(fakeId), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
@ -88,19 +86,17 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
transportAction.execute(null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
@ -122,33 +118,38 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get();
client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.get();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
transportAction.execute(null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get());
expectThrows(
IndexNotFoundException.class,
() -> client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.get()
);
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
@ -176,24 +177,24 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
transportAction.execute(null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(EsRejectedExecutionException.class));
assertThat(reference.get().getMessage(),
equalTo("Could not obtain lock because policy execution for [my-policy] is already in progress."));
assertThat(
reference.get().getMessage(),
equalTo("Could not obtain lock because policy execution for [my-policy] is already in progress.")
);
}
{
enrichPolicyLocks.releasePolicy(name);
@ -202,19 +203,17 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
transportAction.execute(null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());

View File

@ -32,25 +32,23 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
new GetEnrichPolicyAction.Request(),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
reference.set(response);
latch.countDown();
transportAction.execute(null, new GetEnrichPolicyAction.Request(), new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
reference.set(response);
latch.countDown();
}
}
public void onFailure(final Exception e) {
fail();
}
});
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
GetEnrichPolicyAction.Response response = reference.get();
for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
for (EnrichPolicy.NamedPolicy policy : response.getPolicies()) {
try {
deleteEnrichPolicy(policy.getName(), clusterService);
} catch (Exception e) {
@ -74,10 +72,10 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
transportAction.execute(
null,
// empty or null should return the same
randomBoolean() ? new GetEnrichPolicyAction.Request() :
new GetEnrichPolicyAction.Request(new String[]{}),
randomBoolean() ? new GetEnrichPolicyAction.Request() : new GetEnrichPolicyAction.Request(new String[] {}),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
@ -88,7 +86,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
public void onFailure(final Exception e) {
fail();
}
});
}
);
latch.await();
assertNotNull(reference.get());
GetEnrichPolicyAction.Response response = reference.get();
@ -104,20 +103,18 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
new GetEnrichPolicyAction.Request(),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
reference.set(response);
latch.countDown();
transportAction.execute(null, new GetEnrichPolicyAction.Request(), new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
reference.set(response);
latch.countDown();
}
}
public void onFailure(final Exception e) {
fail();
}
});
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
GetEnrichPolicyAction.Response response = reference.get();
@ -140,8 +137,9 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
new GetEnrichPolicyAction.Request(new String[]{name}),
transportAction.execute(
null,
new GetEnrichPolicyAction.Request(new String[] { name }),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
@ -152,7 +150,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
public void onFailure(final Exception e) {
fail();
}
});
}
);
latch.await();
assertNotNull(reference.get());
GetEnrichPolicyAction.Response response = reference.get();
@ -183,8 +182,9 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
new GetEnrichPolicyAction.Request(new String[]{name, anotherName}),
transportAction.execute(
null,
new GetEnrichPolicyAction.Request(new String[] { name, anotherName }),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
@ -195,7 +195,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
public void onFailure(final Exception e) {
fail();
}
});
}
);
latch.await();
assertNotNull(reference.get());
GetEnrichPolicyAction.Response response = reference.get();
@ -214,8 +215,9 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
transportAction.execute(null,
new GetEnrichPolicyAction.Request(new String[]{"non-exists"}),
transportAction.execute(
null,
new GetEnrichPolicyAction.Request(new String[] { "non-exists" }),
new ActionListener<GetEnrichPolicyAction.Response>() {
@Override
public void onResponse(GetEnrichPolicyAction.Response response) {
@ -226,7 +228,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
public void onFailure(final Exception e) {
fail();
}
});
}
);
latch.await();
assertNotNull(reference.get());
assertThat(reference.get().getPolicies().size(), equalTo(0));

View File

@ -48,13 +48,15 @@ public class EnrichCoordinatorDocTests extends BaseMonitoringDocTestCase<EnrichC
}
@Override
protected EnrichCoordinatorDoc createMonitoringDoc(String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id) {
protected EnrichCoordinatorDoc createMonitoringDoc(
String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id
) {
return new EnrichCoordinatorDoc(cluster, timestamp, interval, node, stats);
}
@ -76,29 +78,47 @@ public class EnrichCoordinatorDocTests extends BaseMonitoringDocTestCase<EnrichC
final EnrichCoordinatorDoc document = new EnrichCoordinatorDoc("_cluster", timestamp, intervalMillis, node, stats);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(xContent.utf8ToString(), equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ "\"interval_ms\":" + intervalMillis + ","
+ "\"type\":\"enrich_coordinator_stats\","
+ "\"source_node\":{"
assertThat(
xContent.utf8ToString(),
equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\""
+ DATE_TIME_FORMATTER.formatMillis(timestamp)
+ "\","
+ "\"interval_ms\":"
+ intervalMillis
+ ","
+ "\"type\":\"enrich_coordinator_stats\","
+ "\"source_node\":{"
+ "\"uuid\":\"_uuid\","
+ "\"host\":\"_host\","
+ "\"transport_address\":\"_addr\","
+ "\"ip\":\"_ip\","
+ "\"name\":\"_name\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ "},"
+ "\"enrich_coordinator_stats\":{"
+ "\"node_id\":\"" + stats.getNodeId() + "\","
+ "\"queue_size\":" + stats.getQueueSize() + ","
+ "\"remote_requests_current\":" + stats.getRemoteRequestsCurrent() + ","
+ "\"remote_requests_total\":" + stats.getRemoteRequestsTotal() + ","
+ "\"executed_searches_total\":" + stats.getExecutedSearchesTotal()
+ "}"
+ "}"
));
+ "\"timestamp\":\""
+ DATE_TIME_FORMATTER.formatMillis(nodeTimestamp)
+ "\""
+ "},"
+ "\"enrich_coordinator_stats\":{"
+ "\"node_id\":\""
+ stats.getNodeId()
+ "\","
+ "\"queue_size\":"
+ stats.getQueueSize()
+ ","
+ "\"remote_requests_current\":"
+ stats.getRemoteRequestsCurrent()
+ ","
+ "\"remote_requests_total\":"
+ stats.getRemoteRequestsTotal()
+ ","
+ "\"executed_searches_total\":"
+ stats.getExecutedSearchesTotal()
+ "}"
+ "}"
)
);
}
public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
@ -108,10 +128,15 @@ public class EnrichCoordinatorDocTests extends BaseMonitoringDocTestCase<EnrichC
builder.endObject();
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues
.extractValue("mappings._doc.properties.enrich_coordinator_stats.properties", template);
Map<String, Object> template = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
MonitoringTemplateUtils.loadTemplate("es"),
false
);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue(
"mappings._doc.properties.enrich_coordinator_stats.properties",
template
);
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
@ -121,11 +146,13 @@ public class EnrichCoordinatorDocTests extends BaseMonitoringDocTestCase<EnrichC
Object fieldValue = entry.getValue();
String fieldType = (String) fieldMapping.get("type");
if (fieldValue instanceof Long || fieldValue instanceof Integer) {
assertThat("expected long field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("long"), equalTo("integer")));
assertThat("expected long field type for field [" + fieldName + "]", fieldType, anyOf(equalTo("long"), equalTo("integer")));
} else if (fieldValue instanceof String) {
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("keyword"), equalTo("text")));
assertThat(
"expected keyword field type for field [" + fieldName + "]",
fieldType,
anyOf(equalTo("keyword"), equalTo("text"))
);
} else {
// Manual test specific object fields and if not just fail:
fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");

View File

@ -140,16 +140,17 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
int numCoordinatorStats = randomIntBetween(0, 8);
List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatorStats);
for (int i = 0; i < numCoordinatorStats; i++) {
coordinatorStats.add(new CoordinatorStats(
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong()
));
coordinatorStats.add(
new CoordinatorStats(
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong()
)
);
}
@SuppressWarnings("unchecked")
final ActionFuture<EnrichStatsAction.Response> future = (ActionFuture<EnrichStatsAction.Response>) mock(ActionFuture.class);
final EnrichStatsAction.Response response = new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
@ -157,8 +158,7 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
when(client.execute(eq(EnrichStatsAction.INSTANCE), any(EnrichStatsAction.Request.class))).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(response);
final EnrichStatsCollector collector =
new EnrichStatsCollector(clusterService, licenseState, client, threadContext, settings);
final EnrichStatsCollector collector = new EnrichStatsCollector(clusterService, licenseState, client, threadContext, settings);
assertEquals(timeout, collector.getCollectionTimeout());
final long interval = randomNonNegativeLong();
@ -197,10 +197,12 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
}
}
private EnrichStatsCollector createCollector(Settings settings,
ClusterService clusterService,
XPackLicenseState licenseState,
Client client) {
private EnrichStatsCollector createCollector(
Settings settings,
ClusterService clusterService,
XPackLicenseState licenseState,
Client client
) {
return new EnrichStatsCollector(clusterService, licenseState, client, settings);
}

View File

@ -38,20 +38,19 @@ public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase<Executing
@Override
public void setUp() throws Exception {
super.setUp();
executingPolicy = new ExecutingPolicy(
randomAlphaOfLength(4),
randomTaskInfo()
);
executingPolicy = new ExecutingPolicy(randomAlphaOfLength(4), randomTaskInfo());
}
@Override
protected ExecutingPolicyDoc createMonitoringDoc(String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id) {
protected ExecutingPolicyDoc createMonitoringDoc(
String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id
) {
return new ExecutingPolicyDoc(cluster, timestamp, interval, node, executingPolicy);
}
@ -73,40 +72,65 @@ public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase<Executing
final ExecutingPolicyDoc document = new ExecutingPolicyDoc("_cluster", timestamp, intervalMillis, node, executingPolicy);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
Optional<Map.Entry<String, String>> header =
executingPolicy.getTaskInfo().getHeaders().entrySet().stream().findAny();
assertThat(xContent.utf8ToString(), equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ "\"interval_ms\":" + intervalMillis + ","
+ "\"type\":\"enrich_executing_policy_stats\","
+ "\"source_node\":{"
Optional<Map.Entry<String, String>> header = executingPolicy.getTaskInfo().getHeaders().entrySet().stream().findAny();
assertThat(
xContent.utf8ToString(),
equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\""
+ DATE_TIME_FORMATTER.formatMillis(timestamp)
+ "\","
+ "\"interval_ms\":"
+ intervalMillis
+ ","
+ "\"type\":\"enrich_executing_policy_stats\","
+ "\"source_node\":{"
+ "\"uuid\":\"_uuid\","
+ "\"host\":\"_host\","
+ "\"transport_address\":\"_addr\","
+ "\"ip\":\"_ip\","
+ "\"name\":\"_name\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ "},"
+ "\"enrich_executing_policy_stats\":{"
+ "\"name\":\"" + executingPolicy.getName() + "\","
+ "\"timestamp\":\""
+ DATE_TIME_FORMATTER.formatMillis(nodeTimestamp)
+ "\""
+ "},"
+ "\"enrich_executing_policy_stats\":{"
+ "\"name\":\""
+ executingPolicy.getName()
+ "\","
+ "\"task\":{"
+ "\"node\":\"" + executingPolicy.getTaskInfo().getTaskId().getNodeId() + "\","
+ "\"id\":" + executingPolicy.getTaskInfo().getTaskId().getId() + ","
+ "\"type\":\"" + executingPolicy.getTaskInfo().getType() + "\","
+ "\"action\":\"" + executingPolicy.getTaskInfo().getAction() + "\","
+ "\"description\":\"" + executingPolicy.getTaskInfo().getDescription() + "\","
+ "\"start_time_in_millis\":" + executingPolicy.getTaskInfo().getStartTime() + ","
+ "\"running_time_in_nanos\":" + executingPolicy.getTaskInfo().getRunningTimeNanos() + ","
+ "\"cancellable\":" + executingPolicy.getTaskInfo().isCancellable() + ","
+ header
.map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue()))
.orElse("\"headers\":{}")
+ "\"node\":\""
+ executingPolicy.getTaskInfo().getTaskId().getNodeId()
+ "\","
+ "\"id\":"
+ executingPolicy.getTaskInfo().getTaskId().getId()
+ ","
+ "\"type\":\""
+ executingPolicy.getTaskInfo().getType()
+ "\","
+ "\"action\":\""
+ executingPolicy.getTaskInfo().getAction()
+ "\","
+ "\"description\":\""
+ executingPolicy.getTaskInfo().getDescription()
+ "\","
+ "\"start_time_in_millis\":"
+ executingPolicy.getTaskInfo().getStartTime()
+ ","
+ "\"running_time_in_nanos\":"
+ executingPolicy.getTaskInfo().getRunningTimeNanos()
+ ","
+ "\"cancellable\":"
+ executingPolicy.getTaskInfo().isCancellable()
+ ","
+ header.map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue()))
.orElse("\"headers\":{}")
+ "}"
+ "}"
+ "}"
));
+ "}"
+ "}"
)
);
}
public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
@ -116,10 +140,15 @@ public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase<Executing
builder.endObject();
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues
.extractValue("mappings._doc.properties.enrich_executing_policy_stats.properties", template);
Map<String, Object> template = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
MonitoringTemplateUtils.loadTemplate("es"),
false
);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue(
"mappings._doc.properties.enrich_executing_policy_stats.properties",
template
);
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
@ -129,11 +158,13 @@ public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase<Executing
Object fieldValue = entry.getValue();
String fieldType = (String) fieldMapping.get("type");
if (fieldValue instanceof Long || fieldValue instanceof Integer) {
assertThat("expected long field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("long"), equalTo("integer")));
assertThat("expected long field type for field [" + fieldName + "]", fieldType, anyOf(equalTo("long"), equalTo("integer")));
} else if (fieldValue instanceof String) {
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("keyword"), equalTo("text")));
assertThat(
"expected keyword field type for field [" + fieldName + "]",
fieldType,
anyOf(equalTo("keyword"), equalTo("text"))
);
} else {
if (fieldName.equals("task")) {
assertThat(fieldType, equalTo("object"));