From 5747badaa8b85f6316ce6e40a23713faedea0348 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 2 Sep 2019 08:17:03 +0200 Subject: [PATCH] Allow ingest processors access to node client. (#46077) This is the first PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. --- .../org/elasticsearch/ingest/IngestService.java | 5 +++-- .../java/org/elasticsearch/ingest/Processor.java | 9 ++++++++- .../src/main/java/org/elasticsearch/node/Node.java | 3 ++- .../elasticsearch/ingest/IngestServiceTests.java | 13 +++++++++---- .../snapshots/SnapshotResiliencyTests.java | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 312f2967144..9f196c9cfe8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -86,7 +87,7 @@ public class IngestService implements ClusterStateApplier { public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, - List ingestPlugins) { + List ingestPlugins, Client client) { this.clusterService = clusterService; this.scriptService = scriptService; this.processorFactories = processorFactories( @@ -96,7 +97,7 @@ public class IngestService implements ClusterStateApplier { threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this + ), this, client ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index c064ddb35a1..10bd530e3c1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -110,9 +111,14 @@ public interface Processor { */ public final BiFunction scheduler; + /** + * Provides access to the node client + */ + public final Client client; + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService) { + IngestService ingestService, Client client) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -120,6 +126,7 @@ public interface Processor { this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; this.ingestService = ingestService; + this.client = client; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index e8a68928188..6ad393d7121 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -366,7 +366,8 @@ public class Node implements Closeable { new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings()) .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, - scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); + scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), + pluginsService.filterPlugins(IngestPlugin.class), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c2f03081584..674c1f78153 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -106,8 +107,9 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPlugin() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -115,18 +117,20 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPluginDuplicate() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new IngestService(mock(ClusterService.class), tp, null, null, - null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); + null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); @@ -1157,6 +1161,7 @@ public class IngestServiceTests extends ESTestCase { private static IngestService createWithProcessors(Map processors) { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); return new IngestService(mock(ClusterService.class), threadPool, null, null, @@ -1165,7 +1170,7 @@ public class IngestServiceTests extends ESTestCase { public Map getProcessors(final Processor.Parameters parameters) { return processors; } - })); + }), client); } private class IngestDocumentMatcher extends ArgumentMatcher { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 34a1507e4e2..3ffa975a203 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -983,7 +983,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new IngestService( clusterService, threadPool, environment, scriptService, new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), - Collections.emptyList()), + Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) ));