diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 312f2967144..9f196c9cfe8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -86,7 +87,7 @@ public class IngestService implements ClusterStateApplier { public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, - List ingestPlugins) { + List ingestPlugins, Client client) { this.clusterService = clusterService; this.scriptService = scriptService; this.processorFactories = processorFactories( @@ -96,7 +97,7 @@ public class IngestService implements ClusterStateApplier { threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this + ), this, client ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index c064ddb35a1..10bd530e3c1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -110,9 +111,14 @@ public interface Processor { */ public final BiFunction scheduler; + /** + * Provides access to the node client + */ + public final Client client; + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService) { + IngestService ingestService, Client client) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -120,6 +126,7 @@ public interface Processor { this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; this.ingestService = ingestService; + this.client = client; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index e8a68928188..6ad393d7121 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -366,7 +366,8 @@ public class Node implements Closeable { new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings()) .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, - scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); + scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), + pluginsService.filterPlugins(IngestPlugin.class), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c2f03081584..674c1f78153 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -106,8 +107,9 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPlugin() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -115,18 +117,20 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPluginDuplicate() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new IngestService(mock(ClusterService.class), tp, null, null, - null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); + null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); @@ -1157,6 +1161,7 @@ public class IngestServiceTests extends ESTestCase { private static IngestService createWithProcessors(Map processors) { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); return new IngestService(mock(ClusterService.class), threadPool, null, null, @@ -1165,7 +1170,7 @@ public class IngestServiceTests extends ESTestCase { public Map getProcessors(final Processor.Parameters parameters) { return processors; } - })); + }), client); } private class IngestDocumentMatcher extends ArgumentMatcher { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 34a1507e4e2..3ffa975a203 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -983,7 +983,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new IngestService( clusterService, threadPool, environment, scriptService, new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), - Collections.emptyList()), + Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) ));