This was missing and caused nodes to drop out of the cluster on serialization failures when ever one tried to get an enrich policy task by name. The test in here is a little dirty but I figured it would be nice to have an actual reproducer for the issue and I couldn't find any infrastructure to nicely time the tasks so I put this on top of existing test infra.
This commit is contained in:
parent
d0c7b0a3a8
commit
76f56c1264
|
@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
|
|||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
|
||||
import org.elasticsearch.xpack.core.eql.EqlFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
|
||||
|
@ -641,7 +642,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ANALYTICS, AnalyticsFeatureSetUsage::new),
|
||||
// Enrich
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ENRICH, EnrichFeatureSet.Usage::new),
|
||||
// Searchable snapshots
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, ExecuteEnrichPolicyStatus.NAME, ExecuteEnrichPolicyStatus::new),
|
||||
// Searchable snapshots
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SEARCHABLE_SNAPSHOTS,
|
||||
SearchableSnapshotFeatureSetUsage::new),
|
||||
// Data Streams
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
|
@ -23,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
@ -222,7 +226,27 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
|
|||
);
|
||||
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
|
||||
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
|
||||
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
|
||||
final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute(
|
||||
ExecuteEnrichPolicyAction.INSTANCE,
|
||||
new ExecuteEnrichPolicyAction.Request(POLICY_NAME)
|
||||
);
|
||||
// Make sure we can deserialize enrich policy execution task status
|
||||
final List<TaskInfo> tasks = client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(EnrichPolicyExecutor.TASK_ACTION)
|
||||
.get()
|
||||
.getTasks();
|
||||
// Best effort, sometimes the enrich policy task will not be visible yet or will have already finished
|
||||
if (tasks.isEmpty() == false) {
|
||||
try {
|
||||
final GetTaskResponse getTaskResponse = client().admin().cluster().prepareGetTask(tasks.get(0).getTaskId()).get();
|
||||
assertEquals(getTaskResponse.getTask().getTask().getAction(), EnrichPolicyExecutor.TASK_ACTION);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// ignored, could be the task has already finished
|
||||
}
|
||||
}
|
||||
policyExecuteFuture.actionGet();
|
||||
}
|
||||
|
||||
private static void createPipeline() {
|
||||
|
|
Loading…
Reference in New Issue