s/PipelineStoreBootstrapper/IngestBootstrapper

This commit is contained in:
Martijn van Groningen 2015-12-20 20:18:46 +01:00
parent 6dfcee6937
commit 66d8b5342f
10 changed files with 29 additions and 35 deletions

View File

@ -38,7 +38,11 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
public class PipelineStoreBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener { /**
* Instantiates and wires all the services that the ingest plugin will be needing.
* Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state.
*/
public class IngestBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Environment environment; private final Environment environment;
@ -47,9 +51,9 @@ public class PipelineStoreBootstrapper extends AbstractLifecycleComponent implem
private final Map<String, ProcessorFactoryProvider> processorFactoryProvider; private final Map<String, ProcessorFactoryProvider> processorFactoryProvider;
@Inject @Inject
public PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, Environment environment, public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
ClusterService clusterService, TransportService transportService, ClusterService clusterService, TransportService transportService,
Map<String, ProcessorFactoryProvider> processorFactoryProvider) { Map<String, ProcessorFactoryProvider> processorFactoryProvider) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.environment = environment; this.environment = environment;
@ -61,8 +65,8 @@ public class PipelineStoreBootstrapper extends AbstractLifecycleComponent implem
} }
// for testing: // for testing:
PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService, IngestBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService,
PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) { PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.environment = null; this.environment = null;

View File

@ -46,7 +46,7 @@ public class IngestModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
binder().bind(IngestRestFilter.class).asEagerSingleton(); binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(PipelineStoreBootstrapper.class).asEagerSingleton(); binder().bind(IngestBootstrapper.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));

View File

@ -87,7 +87,7 @@ public class IngestPlugin extends Plugin {
if (transportClient) { if (transportClient) {
return Collections.emptyList(); return Collections.emptyList();
} else { } else {
return Collections.singletonList(PipelineStoreBootstrapper.class); return Collections.singletonList(IngestBootstrapper.class);
} }
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService; import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import java.util.*; import java.util.*;
@ -42,7 +42,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
private final PipelineExecutionService executionService; private final PipelineExecutionService executionService;
@Inject @Inject
public IngestActionFilter(Settings settings, PipelineStoreBootstrapper bootstrapper) { public IngestActionFilter(Settings settings, IngestBootstrapper bootstrapper) {
super(settings); super(settings);
this.executionService = bootstrapper.getPipelineExecutionService(); this.executionService = bootstrapper.getPipelineExecutionService();
} }

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -36,7 +36,7 @@ public class DeletePipelineTransportAction extends HandledTransportAction<Delete
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
@Inject @Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) { public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = bootstrapper.getPipelineStore();
} }

View File

@ -23,25 +23,22 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineDefinition; import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class GetPipelineTransportAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> { public class GetPipelineTransportAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> {
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
@Inject @Inject
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) { public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = bootstrapper.getPipelineStore();
} }

View File

@ -20,29 +20,23 @@
package org.elasticsearch.plugin.ingest.transport.put; package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, IndexResponse> { public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, IndexResponse> {
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
@Inject @Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) { public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = bootstrapper.getPipelineStore();
} }

View File

@ -27,11 +27,10 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map; import java.util.Map;
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> { public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
@ -40,7 +39,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
private final SimulateExecutionService executionService; private final SimulateExecutionService executionService;
@Inject @Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) { public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = bootstrapper.getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool); this.executionService = new SimulateExecutionService(threadPool);

View File

@ -51,10 +51,10 @@ import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
public class PipelineBootstrapperTests extends ESTestCase { public class IngestBootstrapperTests extends ESTestCase {
private PipelineStore store; private PipelineStore store;
private PipelineStoreBootstrapper bootstrapper; private IngestBootstrapper bootstrapper;
@Before @Before
public void init() { public void init() {
@ -64,7 +64,7 @@ public class PipelineBootstrapperTests extends ESTestCase {
store = mock(PipelineStore.class); store = mock(PipelineStore.class);
when(store.isStarted()).thenReturn(false); when(store.isStarted()).thenReturn(false);
PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class); PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class);
bootstrapper = new PipelineStoreBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService); bootstrapper = new IngestBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService);
} }
public void testStartAndStopInBackground() throws Exception { public void testStartAndStopInBackground() throws Exception {
@ -77,7 +77,7 @@ public class PipelineBootstrapperTests extends ESTestCase {
when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList())); when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
Settings settings = Settings.EMPTY; Settings settings = Settings.EMPTY;
PipelineStore store = new PipelineStore(settings, clusterService, transportService); PipelineStore store = new PipelineStore(settings, clusterService, transportService);
PipelineStoreBootstrapper bootstrapper = new PipelineStoreBootstrapper( IngestBootstrapper bootstrapper = new IngestBootstrapper(
settings, threadPool, clusterService, store, null settings, threadPool, clusterService, store, null
); );
bootstrapper.setClient(client); bootstrapper.setClient(client);

View File

@ -36,7 +36,7 @@ import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService; import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
@ -61,7 +61,7 @@ public class IngestActionFilterTests extends ESTestCase {
@Before @Before
public void setup() { public void setup() {
executionService = mock(PipelineExecutionService.class); executionService = mock(PipelineExecutionService.class);
PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class); IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
} }
@ -184,7 +184,7 @@ public class IngestActionFilterTests extends ESTestCase {
}; };
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
executionService = new PipelineExecutionService(store, threadPool); executionService = new PipelineExecutionService(store, threadPool);
PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class); IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);