Allow ingest processors access to node client. (#46077)

This is the first PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

The plan is to merge changes made to the server module separately from
the pr that will merge enrich into master, so that these changes can
be reviewed in isolation.
This commit is contained in:
Martijn van Groningen 2019-09-02 08:17:03 +02:00
parent 6fed082148
commit 5747badaa8
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 23 additions and 9 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -86,7 +87,7 @@ public class IngestService implements ClusterStateApplier {
public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
List<IngestPlugin> ingestPlugins, Client client) {
this.clusterService = clusterService;
this.scriptService = scriptService;
this.processorFactories = processorFactories(
@ -96,7 +97,7 @@ public class IngestService implements ClusterStateApplier {
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, client
)
);
this.threadPool = threadPool;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
@ -110,9 +111,14 @@ public interface Processor {
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
/**
* Provides access to the node client
*/
public final Client client;
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) {
IngestService ingestService, Client client) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
@ -120,6 +126,7 @@ public interface Processor {
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.ingestService = ingestService;
this.client = client;
}
}

View File

@ -366,7 +366,8 @@ public class Node implements Closeable {
new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
@ -106,8 +107,9 @@ public class IngestServiceTests extends ESTestCase {
public void testIngestPlugin() {
ThreadPool tp = mock(ThreadPool.class);
Client client = mock(Client.class);
IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null,
null, Collections.singletonList(DUMMY_PLUGIN));
null, Collections.singletonList(DUMMY_PLUGIN), client);
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
@ -115,18 +117,20 @@ public class IngestServiceTests extends ESTestCase {
public void testIngestPluginDuplicate() {
ThreadPool tp = mock(ThreadPool.class);
Client client = mock(Client.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(mock(ClusterService.class), tp, null, null,
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)));
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client));
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
public void testExecuteIndexPipelineDoesNotExist() {
ThreadPool threadPool = mock(ThreadPool.class);
Client client = mock(Client.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
null, Collections.singletonList(DUMMY_PLUGIN));
null, Collections.singletonList(DUMMY_PLUGIN), client);
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final SetOnce<Boolean> failure = new SetOnce<>();
@ -1157,6 +1161,7 @@ public class IngestServiceTests extends ESTestCase {
private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
ThreadPool threadPool = mock(ThreadPool.class);
Client client = mock(Client.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
return new IngestService(mock(ClusterService.class), threadPool, null, null,
@ -1165,7 +1170,7 @@ public class IngestServiceTests extends ESTestCase {
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}));
}), client);
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {

View File

@ -983,7 +983,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
new IngestService(
clusterService, threadPool, environment, scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList()),
Collections.emptyList(), client),
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
));