wip
This commit is contained in:
parent
83366e7017
commit
18c1cc678a
|
@ -0,0 +1 @@
|
|||
This plugin has no third party dependencies
|
|
@ -0,0 +1,33 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.elasticsearch.plugin</groupId>
|
||||
<artifactId>plugins</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>ingest</artifactId>
|
||||
<name>Plugin: Node ingest</name>
|
||||
<description>Plugin that allows to configure pipelines to preprocess documents before indexing</description>
|
||||
|
||||
<properties>
|
||||
<elasticsearch.plugin.classname>org.elasticsearch.plugin.ingest.IngestPlugin</elasticsearch.plugin.classname>
|
||||
<tests.rest.suite>ingest</tests.rest.suite>
|
||||
<tests.rest.load_packaged>false</tests.rest.load_packaged>
|
||||
<xlint.options>-Xlint:-rawtypes</xlint.options>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public final class Data {
|
||||
|
||||
private final String index;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final Map<String, Object> document;
|
||||
|
||||
public Data(String index, String type, String id, Map<String, Object> document) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.document = document;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T getProperty(String path) {
|
||||
return (T) XContentMapValues.extractValue(path, document);
|
||||
}
|
||||
|
||||
public void addField(String field, String value) {
|
||||
document.put(field, value);
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Map<String, Object> getDocument() {
|
||||
return document;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class Pipeline {
|
||||
|
||||
private final String id;
|
||||
private final String description;
|
||||
private final List<Processor> processors;
|
||||
|
||||
private Pipeline(String id, String description, List<Processor> processors) {
|
||||
this.id = id;
|
||||
this.description = description;
|
||||
this.processors = processors;
|
||||
}
|
||||
|
||||
public void execute(Data data) {
|
||||
for (Processor processor : processors) {
|
||||
processor.execute(data);
|
||||
}
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public List<Processor> getProcessors() {
|
||||
return processors;
|
||||
}
|
||||
|
||||
public final static class Builder {
|
||||
|
||||
private final String name;
|
||||
private String description;
|
||||
private List<Processor> processors = new ArrayList<>();
|
||||
|
||||
public Builder(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public Builder(Map<String, Object> config) {
|
||||
name = (String) config.get("name");
|
||||
description = (String) config.get("description");
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Map<String, Map<String, Object>>> processors = (List<Map<String, Map<String, Object>>>) config.get("processors");
|
||||
if (processors != null ) {
|
||||
for (Map<String, Map<String, Object>> processor : processors) {
|
||||
for (Map.Entry<String, Map<String, Object>> entry : processor.entrySet()) {
|
||||
// TODO: add lookup service...
|
||||
if ("simple".equals(entry.getKey())) {
|
||||
SimpleProcessor.Builder builder = new SimpleProcessor.Builder();
|
||||
builder.fromMap(entry.getValue());
|
||||
this.processors.add(builder.build());
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public void addProcessors(Processor.Builder... processors) {
|
||||
for (Processor.Builder processor : processors) {
|
||||
this.processors.add(processor.build());
|
||||
}
|
||||
}
|
||||
|
||||
public Pipeline build() {
|
||||
return new Pipeline(name, description, Collections.unmodifiableList(processors));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
public interface Processor {
|
||||
|
||||
void execute(Data data);
|
||||
|
||||
String type();
|
||||
|
||||
interface Builder {
|
||||
|
||||
Processor build();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public final class SimpleProcessor implements Processor {
|
||||
|
||||
private final String path;
|
||||
private final String expectedValue;
|
||||
|
||||
private final String addField;
|
||||
private final String addFieldValue;
|
||||
|
||||
public SimpleProcessor(String path, String expectedValue, String addField, String addFieldValue) {
|
||||
this.path = path;
|
||||
this.expectedValue = expectedValue;
|
||||
this.addField = addField;
|
||||
this.addFieldValue = addFieldValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
Object value = data.getProperty(path);
|
||||
if (value != null) {
|
||||
if (value.toString().equals(this.expectedValue)) {
|
||||
data.addField(addField, addFieldValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "logging";
|
||||
}
|
||||
|
||||
public static class Builder implements Processor.Builder {
|
||||
|
||||
private String path;
|
||||
private String value;
|
||||
private String addField;
|
||||
private String addFieldValue;
|
||||
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public void setAddField(String addField) {
|
||||
this.addField = addField;
|
||||
}
|
||||
|
||||
public void setAddFieldValue(String addFieldValue) {
|
||||
this.addFieldValue = addFieldValue;
|
||||
}
|
||||
|
||||
public void fromMap(Map<String, Object> config) {
|
||||
this.path = (String) config.get("path");
|
||||
this.value = (String) config.get("value");
|
||||
this.addField = (String) config.get("add_field");
|
||||
this.addFieldValue = (String) config.get("add_field_value");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor build() {
|
||||
return new SimpleProcessor(path, value, addField, addFieldValue);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
||||
|
||||
public class IngestModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
binder().bind(IngestRestFilter.class).asEagerSingleton();
|
||||
binder().bind(PipelineStore.class).asEagerSingleton();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.action.RestActionModule;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
public class IngestPlugin extends Plugin {
|
||||
|
||||
public static final String INGEST_CONTEXT_KEY = "__ingest__";
|
||||
public static final String INGEST_HTTP_PARAM = "ingest";
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "ingest";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Plugin that allows to configure pipelines to preprocess documents before indexing";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Module> nodeModules() {
|
||||
return Collections.singletonList(new IngestModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
|
||||
return Collections.singletonList(PipelineStore.class);
|
||||
}
|
||||
|
||||
public void onModule(ActionModule module) {
|
||||
module.registerFilter(IngestActionFilter.class);
|
||||
}
|
||||
|
||||
public void onModule(RestActionModule module) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class PipelineStore extends AbstractLifecycleComponent {
|
||||
|
||||
public final static String INDEX = ".pipelines";
|
||||
public final static String TYPE = "pipeline";
|
||||
|
||||
private Client client;
|
||||
private final Injector injector;
|
||||
|
||||
private volatile Updater updater;
|
||||
private volatile CopyOnWriteHashMap<String, Pipeline> pipelines = new CopyOnWriteHashMap<>();
|
||||
|
||||
@Inject
|
||||
public PipelineStore(Settings settings, Injector injector) {
|
||||
super(settings);
|
||||
this.injector = injector;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
client = injector.getInstance(Client.class);
|
||||
updater = new Updater();
|
||||
// TODO: start when local cluster state isn't blocked: ([SERVICE_UNAVAILABLE/1/state not recovered / initialized])
|
||||
updater.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
updater.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
}
|
||||
|
||||
public Pipeline get(String id) {
|
||||
return pipelines.get(id);
|
||||
}
|
||||
|
||||
void updatePipelines() {
|
||||
Map<String, Pipeline> pipelines = new HashMap<>();
|
||||
SearchResponse searchResponse = client.prepareSearch(INDEX)
|
||||
.setScroll(TimeValue.timeValueMinutes(1))
|
||||
.addSort("_doc", SortOrder.ASC)
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.get();
|
||||
logger.info("Loading [{}] pipelines", searchResponse.getHits().totalHits());
|
||||
do {
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
logger.info("Loading pipeline [{}] with source [{}]", hit.getId(), hit.sourceAsString());
|
||||
Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap());
|
||||
pipelines.put(hit.getId(), builder.build());
|
||||
}
|
||||
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).get();
|
||||
} while (searchResponse.getHits().getHits().length != 0);
|
||||
PipelineStore.this.pipelines = PipelineStore.this.pipelines.copyAndPutAll(pipelines);
|
||||
}
|
||||
|
||||
class Updater extends Thread {
|
||||
|
||||
private volatile boolean running = true;
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
public Updater() {
|
||||
super(EsExecutors.threadName(settings, "[updater]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
updatePipelines();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
logger.error("update error", e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
running = false;
|
||||
try {
|
||||
interrupt();
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.rest;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.rest.*;
|
||||
|
||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.*;
|
||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_CONTEXT_KEY;
|
||||
|
||||
public class IngestRestFilter extends RestFilter {
|
||||
|
||||
@Inject
|
||||
public IngestRestFilter(RestController controller) {
|
||||
controller.registerFilter(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
|
||||
request.putInContext(INGEST_CONTEXT_KEY, request.param(INGEST_HTTP_PARAM));
|
||||
filterChain.continueProcessing(request, channel);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class IngestActionFilter extends ActionFilter.Simple {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@Inject
|
||||
public IngestActionFilter(Settings settings, PipelineStore pipelineStore) {
|
||||
super(settings);
|
||||
this.pipelineStore = pipelineStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionRequest request, ActionListener listener) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY);
|
||||
if (pipelineId == null) {
|
||||
pipelineId = request.getHeader(IngestPlugin.INGEST_HTTP_PARAM);
|
||||
if (pipelineId == null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Pipeline pipeline = pipelineStore.get(pipelineId);
|
||||
if (pipeline == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (request instanceof IndexRequest) {
|
||||
processIndexRequest((IndexRequest) request, pipeline);
|
||||
} else if (request instanceof BulkRequest) {
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
List<ActionRequest> actionRequests = bulkRequest.requests();
|
||||
for (ActionRequest actionRequest : actionRequests) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
processIndexRequest((IndexRequest) actionRequest, pipeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void processIndexRequest(IndexRequest indexRequest, Pipeline pipeline) {
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
pipeline.execute(data);
|
||||
indexRequest.source(data.getDocument());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionResponse response, ActionListener listener) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.MockNode;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class IngestRunner {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Settings.Builder settings = Settings.builder();
|
||||
settings.put("http.cors.enabled", "true");
|
||||
settings.put("http.cors.allow-origin", "*");
|
||||
settings.put("script.inline", "on");
|
||||
settings.put("cluster.name", IngestRunner.class.getSimpleName());
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Node node = new MockNode(settings.build(), Version.CURRENT, Collections.singleton(IngestPlugin.class));
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
node.close();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
node.start();
|
||||
latch.await();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0)
|
||||
public class BasicTests extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singletonList(IngestPlugin.class);
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, "_id")
|
||||
.setSource(jsonBuilder().startObject()
|
||||
.field("name", "my_pipeline")
|
||||
.field("description", "my_pipeline")
|
||||
.startArray("processors")
|
||||
.startObject()
|
||||
.startObject("simple")
|
||||
.field("path", "field2")
|
||||
.field("value", "abc")
|
||||
.field("add_field", "field3")
|
||||
.field("add_field_value", "xyz")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endArray()
|
||||
.endObject())
|
||||
.setRefresh(true)
|
||||
.get();
|
||||
Thread.sleep(5000);
|
||||
|
||||
createIndex("test");
|
||||
client().prepareIndex("test", "type", "1").setSource("field2", "abc")
|
||||
.putHeader("ingest", "_id")
|
||||
.get();
|
||||
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
|
||||
client().prepareBulk().add(
|
||||
client().prepareIndex("test", "type", "2").setSource("field2", "abc")
|
||||
).putHeader("ingest", "_id").get();
|
||||
|
||||
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableMockModules() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.RestTestCandidate;
|
||||
import org.elasticsearch.test.rest.parser.RestTestParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
public class IngestRestIT extends ESRestTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(IngestPlugin.class);
|
||||
}
|
||||
|
||||
public IngestRestIT(@Name("yaml") RestTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
|
||||
return ESRestTestCase.createParameters(0, 1);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
"Ingest plugin installed":
|
||||
- do:
|
||||
cluster.stats: {}
|
||||
|
||||
- match: { nodes.plugins.0.name: ingest }
|
||||
- match: { nodes.plugins.0.jvm: true }
|
|
@ -228,6 +228,7 @@
|
|||
<include>api/info.json</include>
|
||||
<include>api/cluster.health.json</include>
|
||||
<include>api/cluster.state.json</include>
|
||||
<include>api/cluster.stats.json</include>
|
||||
<!-- used in plugin REST tests -->
|
||||
<include>api/index.json</include>
|
||||
<include>api/get.json</include>
|
||||
|
@ -402,6 +403,7 @@
|
|||
<module>repository-azure</module>
|
||||
<module>repository-s3</module>
|
||||
<module>store-smb</module>
|
||||
<module>ingest</module>
|
||||
|
||||
<!-- Internal plugins for test only -->
|
||||
<module>jvm-example</module>
|
||||
|
|
Loading…
Reference in New Issue