only update pipeline if the content has been changed
split the actual fetching of pipeline docs from the pipeline store to make unit testing easier intoduced factory for builders replaced hardcoded processor lookups with simple factory based registry
This commit is contained in:
parent
18c1cc678a
commit
2071db688c
|
@ -23,6 +23,9 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Represents the data and meta data (like id and type) of a single document that is going to be indexed.
|
||||
*/
|
||||
public final class Data {
|
||||
|
||||
private final String index;
|
||||
|
|
|
@ -25,6 +25,9 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
|
||||
*/
|
||||
public final class Pipeline {
|
||||
|
||||
private final String id;
|
||||
|
@ -37,20 +40,32 @@ public final class Pipeline {
|
|||
this.processors = processors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Modifies the data of a document to be indexed based on the processor this pipeline holds
|
||||
*/
|
||||
public void execute(Data data) {
|
||||
for (Processor processor : processors) {
|
||||
processor.execute(data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The unique id of this pipeline
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional description of what this pipeline is doing to the data gets processed by this pipeline.
|
||||
*/
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmodifiable list containing each processor that operates on the data.
|
||||
*/
|
||||
public List<Processor> getProcessors() {
|
||||
return processors;
|
||||
}
|
||||
|
@ -65,7 +80,7 @@ public final class Pipeline {
|
|||
this.name = name;
|
||||
}
|
||||
|
||||
public Builder(Map<String, Object> config) {
|
||||
public Builder(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) {
|
||||
name = (String) config.get("name");
|
||||
description = (String) config.get("description");
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -73,13 +88,12 @@ public final class Pipeline {
|
|||
if (processors != null ) {
|
||||
for (Map<String, Map<String, Object>> processor : processors) {
|
||||
for (Map.Entry<String, Map<String, Object>> entry : processor.entrySet()) {
|
||||
// TODO: add lookup service...
|
||||
if ("simple".equals(entry.getKey())) {
|
||||
SimpleProcessor.Builder builder = new SimpleProcessor.Builder();
|
||||
Processor.Builder builder = processorRegistry.get(entry.getKey()).create();
|
||||
if (builder != null) {
|
||||
builder.fromMap(entry.getValue());
|
||||
this.processors.add(builder.build());
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,16 +20,46 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An processor implementation may modify the data belonging to a document.
|
||||
* If and what exactly is modified is upto the implementation.
|
||||
*/
|
||||
public interface Processor {
|
||||
|
||||
/**
|
||||
* Introspect and potentially modify the incoming data.
|
||||
*/
|
||||
void execute(Data data);
|
||||
|
||||
String type();
|
||||
|
||||
/**
|
||||
* A builder to contruct a processor to be used in a pipeline.
|
||||
*/
|
||||
interface Builder {
|
||||
|
||||
/**
|
||||
* A general way to set processor related settings based on the config map.
|
||||
*/
|
||||
void fromMap(Map<String, Object> config);
|
||||
|
||||
/**
|
||||
* Builds the processor based on previous set settings.
|
||||
*/
|
||||
Processor build();
|
||||
|
||||
/**
|
||||
* A factory that creates a processor builder when processor instances for pipelines are being created.
|
||||
*/
|
||||
interface Factory {
|
||||
|
||||
/**
|
||||
* Creates the builder.
|
||||
*/
|
||||
Builder create();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.util.Map;
|
|||
|
||||
public final class SimpleProcessor implements Processor {
|
||||
|
||||
public static final String TYPE = "simple";
|
||||
|
||||
private final String path;
|
||||
private final String expectedValue;
|
||||
|
||||
|
@ -46,11 +48,6 @@ public final class SimpleProcessor implements Processor {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "logging";
|
||||
}
|
||||
|
||||
public static class Builder implements Processor.Builder {
|
||||
|
||||
private String path;
|
||||
|
@ -85,6 +82,15 @@ public final class SimpleProcessor implements Processor {
|
|||
public Processor build() {
|
||||
return new SimpleProcessor(path, value, addField, addFieldValue);
|
||||
}
|
||||
|
||||
public static class Factory implements Processor.Builder.Factory {
|
||||
|
||||
@Override
|
||||
public Processor.Builder create() {
|
||||
return new Builder();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,15 +20,36 @@
|
|||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.ingest.SimpleProcessor;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class IngestModule extends AbstractModule {
|
||||
|
||||
private final Map<String, Class<? extends Processor.Builder.Factory>> processors = new HashMap<>();
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
binder().bind(IngestRestFilter.class).asEagerSingleton();
|
||||
binder().bind(PipelineStore.class).asEagerSingleton();
|
||||
binder().bind(PipelineConfigDocReader.class).asEagerSingleton();
|
||||
|
||||
registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class);
|
||||
|
||||
MapBinder<String, Processor.Builder.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Builder.Factory.class);
|
||||
for (Map.Entry<String, Class<? extends Processor.Builder.Factory>> entry : processors.entrySet()) {
|
||||
mapBinder.addBinding(entry.getKey()).to(entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void registerProcessor(String processorType, Class<? extends Processor.Builder.Factory> processorFactory) {
|
||||
processors.put(processorType, processorFactory);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,13 +27,14 @@ import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.action.RestActionModule;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
public class IngestPlugin extends Plugin {
|
||||
|
||||
public static final String INGEST_CONTEXT_KEY = "__ingest__";
|
||||
public static final String INGEST_HTTP_PARAM = "ingest";
|
||||
public static final String INGEST_PARAM = "ingest";
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
|
@ -52,14 +53,11 @@ public class IngestPlugin extends Plugin {
|
|||
|
||||
@Override
|
||||
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
|
||||
return Collections.singletonList(PipelineStore.class);
|
||||
return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class);
|
||||
}
|
||||
|
||||
public void onModule(ActionModule module) {
|
||||
module.registerFilter(IngestActionFilter.class);
|
||||
}
|
||||
|
||||
public void onModule(RestActionModule module) {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class PipelineConfigDocReader extends AbstractLifecycleComponent {
|
||||
|
||||
private volatile Client client;
|
||||
private final Injector injector;
|
||||
private final TimeValue scrollTimeout;
|
||||
|
||||
@Inject
|
||||
public PipelineConfigDocReader(Settings settings, Injector injector) {
|
||||
super(settings);
|
||||
this.injector = injector;
|
||||
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
client = injector.getInstance(Client.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
client.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
}
|
||||
|
||||
public Iterable<SearchHit> readAll() {
|
||||
// TODO: the search should be replaced with an ingest API when it is available
|
||||
SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX)
|
||||
.setVersion(true)
|
||||
.setScroll(scrollTimeout)
|
||||
.addSort("_doc", SortOrder.ASC)
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.get();
|
||||
|
||||
if (searchResponse.getHits().getTotalHits() == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
logger.debug("reading [{}] pipeline documents", searchResponse.getHits().totalHits());
|
||||
return new Iterable<SearchHit>() {
|
||||
@Override
|
||||
public Iterator<SearchHit> iterator() {
|
||||
return new SearchScrollIterator(searchResponse);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
class SearchScrollIterator implements Iterator<SearchHit> {
|
||||
|
||||
private SearchResponse searchResponse;
|
||||
|
||||
private int currentIndex;
|
||||
private SearchHit[] currentHits;
|
||||
|
||||
SearchScrollIterator(SearchResponse searchResponse) {
|
||||
this.searchResponse = searchResponse;
|
||||
this.currentHits = searchResponse.getHits().getHits();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (currentIndex < currentHits.length) {
|
||||
return true;
|
||||
} else {
|
||||
if (searchResponse == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
|
||||
.setScroll(scrollTimeout)
|
||||
.get();
|
||||
if (searchResponse.getHits().getHits().length == 0) {
|
||||
searchResponse = null;
|
||||
return false;
|
||||
} else {
|
||||
currentHits = searchResponse.getHits().getHits();
|
||||
currentIndex = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchHit next() {
|
||||
SearchHit hit = currentHits[currentIndex++];
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("reading pipeline document [{}] with source [{}]", hit.getId(), hit.sourceAsString());
|
||||
}
|
||||
return hit;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -19,52 +19,56 @@
|
|||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.Set;
|
||||
|
||||
public class PipelineStore extends AbstractLifecycleComponent {
|
||||
|
||||
public final static String INDEX = ".pipelines";
|
||||
public final static String INDEX = ".ingest";
|
||||
public final static String TYPE = "pipeline";
|
||||
|
||||
private Client client;
|
||||
private final Injector injector;
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final TimeValue pipelineUpdateInterval;
|
||||
private final PipelineConfigDocReader configDocReader;
|
||||
private final Map<String, Processor.Builder.Factory> processorFactoryRegistry;
|
||||
|
||||
private volatile Updater updater;
|
||||
private volatile CopyOnWriteHashMap<String, Pipeline> pipelines = new CopyOnWriteHashMap<>();
|
||||
private volatile Map<String, PipelineReference> pipelines = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public PipelineStore(Settings settings, Injector injector) {
|
||||
public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineConfigDocReader configDocReader, Map<String, Processor.Builder.Factory> processors) {
|
||||
super(settings);
|
||||
this.injector = injector;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
|
||||
this.configDocReader = configDocReader;
|
||||
this.processorFactoryRegistry = Collections.unmodifiableMap(processors);
|
||||
clusterService.add(new PipelineStoreListener());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
client = injector.getInstance(Client.class);
|
||||
updater = new Updater();
|
||||
// TODO: start when local cluster state isn't blocked: ([SERVICE_UNAVAILABLE/1/state not recovered / initialized])
|
||||
updater.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
updater.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,65 +76,109 @@ public class PipelineStore extends AbstractLifecycleComponent {
|
|||
}
|
||||
|
||||
public Pipeline get(String id) {
|
||||
return pipelines.get(id);
|
||||
PipelineReference ref = pipelines.get(id);
|
||||
if (ref != null) {
|
||||
return ref.getPipeline();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
void updatePipelines() {
|
||||
Map<String, Pipeline> pipelines = new HashMap<>();
|
||||
SearchResponse searchResponse = client.prepareSearch(INDEX)
|
||||
.setScroll(TimeValue.timeValueMinutes(1))
|
||||
.addSort("_doc", SortOrder.ASC)
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.get();
|
||||
logger.info("Loading [{}] pipelines", searchResponse.getHits().totalHits());
|
||||
do {
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
logger.info("Loading pipeline [{}] with source [{}]", hit.getId(), hit.sourceAsString());
|
||||
Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap());
|
||||
pipelines.put(hit.getId(), builder.build());
|
||||
int changed = 0;
|
||||
Map<String, PipelineReference> newPipelines = new HashMap<>(pipelines);
|
||||
for (SearchHit hit : configDocReader.readAll()) {
|
||||
String pipelineId = hit.getId();
|
||||
BytesReference pipelineSource = hit.getSourceRef();
|
||||
PipelineReference previous = newPipelines.get(pipelineId);
|
||||
if (previous != null) {
|
||||
if (previous.getSource().equals(pipelineSource)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).get();
|
||||
} while (searchResponse.getHits().getHits().length != 0);
|
||||
PipelineStore.this.pipelines = PipelineStore.this.pipelines.copyAndPutAll(pipelines);
|
||||
|
||||
changed++;
|
||||
Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap(), processorFactoryRegistry);
|
||||
newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource));
|
||||
}
|
||||
|
||||
if (changed != 0) {
|
||||
logger.debug("adding or updating [{}] pipelines", changed);
|
||||
pipelines = newPipelines;
|
||||
} else {
|
||||
logger.debug("adding no new pipelines");
|
||||
}
|
||||
}
|
||||
|
||||
class Updater extends Thread {
|
||||
|
||||
private volatile boolean running = true;
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
public Updater() {
|
||||
super(EsExecutors.threadName(settings, "[updater]"));
|
||||
void startUpdateWorker() {
|
||||
if (lifecycleState() == Lifecycle.State.STARTED) {
|
||||
threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater());
|
||||
}
|
||||
}
|
||||
|
||||
class Updater implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
updatePipelines();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
logger.error("update error", e);
|
||||
}
|
||||
}
|
||||
updatePipelines();
|
||||
} catch (Exception e) {
|
||||
logger.error("pipeline store update failure", e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
running = false;
|
||||
try {
|
||||
interrupt();
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
startUpdateWorker();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class PipelineStoreListener implements ClusterStateListener {
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
|
||||
startUpdateWorker();
|
||||
clusterService.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class PipelineReference {
|
||||
|
||||
private final Pipeline pipeline;
|
||||
private final long version;
|
||||
private final BytesReference source;
|
||||
|
||||
PipelineReference(Pipeline pipeline, long version, BytesReference source) {
|
||||
this.pipeline = pipeline;
|
||||
this.version = version;
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public Pipeline getPipeline() {
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public BytesReference getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
PipelineReference holder = (PipelineReference) o;
|
||||
return source.equals(holder.source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return source.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public class IngestRestFilter extends RestFilter {
|
|||
|
||||
@Override
|
||||
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
|
||||
request.putInContext(INGEST_CONTEXT_KEY, request.param(INGEST_HTTP_PARAM));
|
||||
request.putInContext(INGEST_CONTEXT_KEY, request.param(INGEST_PARAM));
|
||||
filterChain.continueProcessing(request, channel);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class IngestActionFilter extends ActionFilter.Simple {
|
|||
protected boolean apply(String action, ActionRequest request, ActionListener listener) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY);
|
||||
if (pipelineId == null) {
|
||||
pipelineId = request.getHeader(IngestPlugin.INGEST_HTTP_PARAM);
|
||||
pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM);
|
||||
if (pipelineId == null) {
|
||||
return true;
|
||||
}
|
||||
|
@ -73,6 +73,7 @@ public class IngestActionFilter extends ActionFilter.Simple {
|
|||
return true;
|
||||
}
|
||||
|
||||
// TODO: this should be delegated to a PipelineExecutor service that executes on a different thread (pipeline TP)
|
||||
void processIndexRequest(IndexRequest indexRequest, Pipeline pipeline) {
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class PipelineConfigDocReaderTests extends ESSingleNodeTestCase {
|
||||
|
||||
public void testReadAll() {
|
||||
PipelineConfigDocReader reader = new PipelineConfigDocReader(Settings.EMPTY, node().injector());
|
||||
reader.start();
|
||||
|
||||
createIndex(PipelineStore.INDEX);
|
||||
int numDocs = scaledRandomIntBetween(32, 128);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, Integer.toString(i))
|
||||
.setSource("field", "value" + i)
|
||||
.get();
|
||||
}
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
|
||||
int i = 0;
|
||||
for (SearchHit hit : reader.readAll()) {
|
||||
assertThat(hit.getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(hit.getVersion(), equalTo(1l));
|
||||
assertThat(hit.getSource().get("field"), equalTo("value" + i));
|
||||
i++;
|
||||
}
|
||||
assertThat(i, equalTo(numDocs));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.ingest.SimpleProcessor;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PipelineStoreTests extends ESTestCase {
|
||||
|
||||
private PipelineStore store;
|
||||
private ThreadPool threadPool;
|
||||
private PipelineConfigDocReader docReader;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
threadPool = new ThreadPool("test");
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
docReader = mock(PipelineConfigDocReader.class);
|
||||
store = new PipelineStore(Settings.EMPTY, threadPool, clusterService, docReader, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory()));
|
||||
store.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
store.stop();
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
|
||||
public void testUpdatePipeline() {
|
||||
List<SearchHit> hits = new ArrayList<>();
|
||||
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}"))
|
||||
);
|
||||
|
||||
when(docReader.readAll()).thenReturn(hits);
|
||||
assertThat(store.get("1"), nullValue());
|
||||
|
||||
store.updatePipelines();
|
||||
assertThat(store.get("1").getId(), equalTo("_name1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
|
||||
hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}"))
|
||||
);
|
||||
store.updatePipelines();
|
||||
assertThat(store.get("1").getId(), equalTo("_name1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
assertThat(store.get("2").getId(), equalTo("_name2"));
|
||||
assertThat(store.get("2").getDescription(), equalTo("_description2"));
|
||||
}
|
||||
|
||||
public void testPipelineUpdater() throws Exception {
|
||||
List<SearchHit> hits = new ArrayList<>();
|
||||
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}"))
|
||||
);
|
||||
when(docReader.readAll()).thenReturn(hits);
|
||||
assertThat(store.get("1"), nullValue());
|
||||
|
||||
store.startUpdateWorker();
|
||||
assertBusy(() -> {
|
||||
assertThat(store.get("1"), notNullValue());
|
||||
assertThat(store.get("1").getId(), equalTo("_name1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
});
|
||||
|
||||
hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}"))
|
||||
);
|
||||
assertBusy(() -> {
|
||||
assertThat(store.get("1"), notNullValue());
|
||||
assertThat(store.get("1").getId(), equalTo("_name1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
assertThat(store.get("2"), notNullValue());
|
||||
assertThat(store.get("2").getId(), equalTo("_name2"));
|
||||
assertThat(store.get("2").getDescription(), equalTo("_description2"));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -47,6 +47,12 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch</groupId>
|
||||
<artifactId>securemock</artifactId>
|
||||
<version>1.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Provided dependencies by elasticsearch itself -->
|
||||
<dependency>
|
||||
|
|
Loading…
Reference in New Issue