added put, get and delete pipeline APIs.

This commit is contained in:
Martijn van Groningen 2015-10-09 15:02:12 +02:00
parent 11f17c0d7d
commit 5a3c75ebac
31 changed files with 1467 additions and 56 deletions

View File

@ -0,0 +1,74 @@
[[ingest]]
== Ingest Plugin
TODO
=== Put pipeline API
The put pipeline api adds pipelines and updates existing pipelines in the cluster.
[source,js]
--------------------------------------------------
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"simple" : {
// settings
}
},
// other processors
]
}
--------------------------------------------------
// AUTOSENSE
NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all
nodes to have the latest version of the pipeline.
=== Get pipeline API
The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline.
[source,js]
--------------------------------------------------
GET _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE
Example response:
[source,js]
--------------------------------------------------
{
"my-pipeline-id": {
"_source" : {
"description": "describe pipeline",
"processors": [
{
"simple" : {
// settings
}
},
// other processors
]
},
"_version" : 0
}
}
--------------------------------------------------
For each returned pipeline the source and the version is returned.
The version is useful for knowing what version of the pipeline the node has.
Multiple ids can be provided at the same time. Also wildcards are supported.
=== Delete pipeline API
The delete pipeline api deletes pipelines by id.
[source,js]
--------------------------------------------------
DELETE _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE

View File

@ -72,16 +72,15 @@ public final class Pipeline {
public final static class Builder {
private final String name;
private final String id;
private String description;
private List<Processor> processors = new ArrayList<>();
public Builder(String name) {
this.name = name;
public Builder(String id) {
this.id = id;
}
public Builder(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) {
name = (String) config.get("name");
public void fromMap(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) {
description = (String) config.get("description");
@SuppressWarnings("unchecked")
List<Map<String, Map<String, Object>>> processors = (List<Map<String, Map<String, Object>>>) config.get("processors");
@ -111,7 +110,7 @@ public final class Pipeline {
}
public Pipeline build() {
return new Pipeline(name, description, Collections.unmodifiableList(processors));
return new Pipeline(id, description, Collections.unmodifiableList(processors));
}
}
}

View File

@ -21,11 +21,9 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.SimpleProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import java.util.HashMap;
import java.util.Map;
@ -39,7 +37,7 @@ public class IngestModule extends AbstractModule {
binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineConfigDocReader.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton();
registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class);

View File

@ -21,12 +21,22 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.action.RestActionModule;
import org.elasticsearch.rest.RestModule;
import java.util.Arrays;
import java.util.Collection;
@ -42,9 +52,11 @@ public class IngestPlugin extends Plugin {
public static final String NAME = "ingest";
private final Settings nodeSettings;
private final boolean transportClient;
public IngestPlugin(Settings nodeSettings) {
this.nodeSettings = nodeSettings;
transportClient = "transport".equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
}
@Override
@ -59,12 +71,20 @@ public class IngestPlugin extends Plugin {
@Override
public Collection<Module> nodeModules() {
return Collections.singletonList(new IngestModule());
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(new IngestModule());
}
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class);
if (transportClient) {
return Collections.emptyList();
} else {
return Arrays.asList(PipelineStore.class, PipelineStoreClient.class);
}
}
@Override
@ -75,7 +95,18 @@ public class IngestPlugin extends Plugin {
}
public void onModule(ActionModule module) {
module.registerFilter(IngestActionFilter.class);
if (!transportClient) {
module.registerFilter(IngestActionFilter.class);
}
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
}
public void onModule(RestModule restModule) {
restModule.addRestAction(RestPutPipelineAction.class);
restModule.addRestAction(RestGetPipelineAction.class);
restModule.addRestAction(RestDeletePipelineAction.class);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
@ -34,10 +35,7 @@ import org.elasticsearch.ingest.Processor;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
public class PipelineStore extends AbstractLifecycleComponent {
@ -47,13 +45,13 @@ public class PipelineStore extends AbstractLifecycleComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TimeValue pipelineUpdateInterval;
private final PipelineConfigDocReader configDocReader;
private final PipelineStoreClient configDocReader;
private final Map<String, Processor.Builder.Factory> processorFactoryRegistry;
private volatile Map<String, PipelineReference> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineConfigDocReader configDocReader, Map<String, Processor.Builder.Factory> processors) {
public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineStoreClient configDocReader, Map<String, Processor.Builder.Factory> processors) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
@ -84,10 +82,32 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
}
public List<PipelineReference> getReference(String... ids) {
List<PipelineReference> result = new ArrayList<>(ids.length);
for (String id : ids) {
if (Regex.isSimpleMatchPattern(id)) {
for (Map.Entry<String, PipelineReference> entry : pipelines.entrySet()) {
if (Regex.simpleMatch(id, entry.getKey())) {
result.add(entry.getValue());
}
}
} else {
PipelineReference reference = pipelines.get(id);
if (reference != null) {
result.add(reference);
}
}
}
return result;
}
void updatePipelines() {
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
// so for that reason the goal is to keep the update logic simple.
int changed = 0;
Map<String, PipelineReference> newPipelines = new HashMap<>(pipelines);
for (SearchHit hit : configDocReader.readAll()) {
for (SearchHit hit : configDocReader.readAllPipelines()) {
String pipelineId = hit.getId();
BytesReference pipelineSource = hit.getSourceRef();
PipelineReference previous = newPipelines.get(pipelineId);
@ -98,15 +118,24 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
changed++;
Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap(), processorFactoryRegistry);
Pipeline.Builder builder = new Pipeline.Builder(hit.getId());
builder.fromMap(hit.sourceAsMap(), processorFactoryRegistry);
newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource));
}
if (changed != 0) {
logger.debug("adding or updating [{}] pipelines", changed);
int removed = 0;
for (String existingPipelineId : pipelines.keySet()) {
if (!configDocReader.existPipeline(existingPipelineId)) {
newPipelines.remove(existingPipelineId);
removed++;
}
}
if (changed != 0 || removed != 0) {
logger.debug("adding or updating [{}] pipelines and [{}] pipelines removed", changed, removed);
pipelines = newPipelines;
} else {
logger.debug("adding no new pipelines");
logger.debug("no pipelines changes detected");
}
}
@ -142,7 +171,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
}
static class PipelineReference {
public static class PipelineReference {
private final Pipeline pipeline;
private final long version;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
@ -33,14 +34,14 @@ import org.elasticsearch.search.sort.SortOrder;
import java.util.Collections;
import java.util.Iterator;
public class PipelineConfigDocReader extends AbstractLifecycleComponent {
public class PipelineStoreClient extends AbstractLifecycleComponent {
private volatile Client client;
private final Injector injector;
private final TimeValue scrollTimeout;
@Inject
public PipelineConfigDocReader(Settings settings, Injector injector) {
public PipelineStoreClient(Settings settings, Injector injector) {
super(settings);
this.injector = injector;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
@ -60,7 +61,7 @@ public class PipelineConfigDocReader extends AbstractLifecycleComponent {
protected void doClose() {
}
public Iterable<SearchHit> readAll() {
public Iterable<SearchHit> readAllPipelines() {
// TODO: the search should be replaced with an ingest API when it is available
SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX)
.setVersion(true)
@ -81,6 +82,11 @@ public class PipelineConfigDocReader extends AbstractLifecycleComponent {
};
}
public boolean existPipeline(String pipelineId) {
GetResponse response = client.prepareGet(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId).get();
return response.isExists();
}
class SearchScrollIterator implements Iterator<SearchHit> {
private SearchResponse searchResponse;

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestDeletePipelineAction extends BaseRestHandler {
@Inject
public RestDeletePipelineAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this);
}
@Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
DeletePipelineRequest request = new DeletePipelineRequest();
request.id(restRequest.param("id"));
client.execute(DeletePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestGetPipelineAction extends BaseRestHandler {
@Inject
public RestGetPipelineAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{ids}", this);
}
@Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
GetPipelineRequest request = new GetPipelineRequest();
request.ids(Strings.splitStringByCommaToArray(restRequest.param("ids")));
client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestPutPipelineAction extends BaseRestHandler {
@Inject
public RestPutPipelineAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this);
}
@Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
PutPipelineRequest request = new PutPipelineRequest();
request.id(restRequest.param("id"));
if (restRequest.hasContent()) {
request.source(restRequest.content());
}
client.execute(PutPipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class DeletePipelineAction extends Action<DeletePipelineRequest, DeletePipelineResponse, DeletePipelineRequestBuilder> {
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();
public static final String NAME = "cluster:admin/ingest/pipeline/delete";
public DeletePipelineAction() {
super(NAME);
}
@Override
public DeletePipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new DeletePipelineRequestBuilder(client, this);
}
@Override
public DeletePipelineResponse newResponse() {
return new DeletePipelineResponse();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class DeletePipelineRequest extends ActionRequest {
private String id;
public void id(String id) {
this.id = id;
}
public String id() {
return id;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class DeletePipelineRequestBuilder extends ActionRequestBuilder<DeletePipelineRequest, DeletePipelineResponse, DeletePipelineRequestBuilder> {
public DeletePipelineRequestBuilder(ElasticsearchClient client, DeletePipelineAction action) {
super(client, action, new DeletePipelineRequest());
}
public DeletePipelineRequestBuilder setId(String id) {
request.id(id);
return this;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentHelper;
import java.io.IOException;
import java.util.Map;
public class DeletePipelineResponse extends ActionResponse implements ToXContent {
private String id;
private boolean found;
DeletePipelineResponse() {
}
public DeletePipelineResponse(String id, boolean found) {
this.id = id;
this.found = found;
}
public String id() {
return id;
}
public boolean found() {
return found;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.id = in.readString();
this.found = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(found);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.ID, id);
builder.field(Fields.FOUND, found);
return builder;
}
static final class Fields {
static final XContentBuilderString ID = new XContentBuilderString("_id");
static final XContentBuilderString FOUND = new XContentBuilderString("_found");
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DeletePipelineTransportAction extends HandledTransportAction<DeletePipelineRequest, DeletePipelineResponse> {
private final TransportDeleteAction deleteAction;
@Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportDeleteAction deleteAction) {
super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.deleteAction = deleteAction;
}
@Override
protected void doExecute(DeletePipelineRequest request, ActionListener<DeletePipelineResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.index(PipelineStore.INDEX);
deleteRequest.type(PipelineStore.TYPE);
deleteRequest.id(request.id());
deleteRequest.refresh(true);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
listener.onResponse(new DeletePipelineResponse(deleteResponse.getId(), deleteResponse.isFound()));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class GetPipelineAction extends Action<GetPipelineRequest, GetPipelineResponse, GetPipelineRequestBuilder> {
public static final GetPipelineAction INSTANCE = new GetPipelineAction();
public static final String NAME = "cluster:admin/ingest/pipeline/get";
public GetPipelineAction() {
super(NAME);
}
@Override
public GetPipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new GetPipelineRequestBuilder(client, this);
}
@Override
public GetPipelineResponse newResponse() {
return new GetPipelineResponse();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class GetPipelineRequest extends ActionRequest {
private String[] ids;
public void ids(String... ids) {
this.ids = ids;
}
public String[] ids() {
return ids;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (ids == null || ids.length == 0) {
validationException = addValidationError("ids is missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ids = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(ids);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class GetPipelineRequestBuilder extends ActionRequestBuilder<GetPipelineRequest, GetPipelineResponse, GetPipelineRequestBuilder> {
public GetPipelineRequestBuilder(ElasticsearchClient client, GetPipelineAction action) {
super(client, action, new GetPipelineRequest());
}
public GetPipelineRequestBuilder setIds(String... ids) {
request.ids(ids);
return this;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class GetPipelineResponse extends ActionResponse implements StatusToXContent {
private Map<String, BytesReference> pipelines;
private Map<String, Long> versions;
public GetPipelineResponse() {
}
public GetPipelineResponse(Map<String, BytesReference> pipelines, Map<String, Long> versions) {
this.pipelines = pipelines;
this.versions = versions;
}
public Map<String, BytesReference> pipelines() {
return pipelines;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
pipelines = new HashMap<>(size);
for (int i = 0; i < size; i++) {
pipelines.put(in.readString(), in.readBytesReference());
}
size = in.readVInt();
versions = new HashMap<>(size);
for (int i = 0; i < size; i++) {
versions.put(in.readString(), in.readVLong());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pipelines.size());
for (Map.Entry<String, BytesReference> entry : pipelines.entrySet()) {
out.writeString(entry.getKey());
out.writeBytesReference(entry.getValue());
}
out.writeVInt(versions.size());
for (Map.Entry<String, Long> entry : versions.entrySet()) {
out.writeString(entry.getKey());
out.writeVLong(entry.getValue());
}
}
public boolean isFound() {
return !pipelines.isEmpty();
}
@Override
public RestStatus status() {
return isFound() ? RestStatus.OK : RestStatus.NOT_FOUND;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, BytesReference> entry : pipelines.entrySet()) {
builder.startObject(entry.getKey());
XContentHelper.writeRawField("_source", entry.getValue(), builder, params);
builder.field("_version", versions.get(entry.getKey()));
builder.endObject();
}
return builder;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetPipelineTransportAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> {
private final PipelineStore pipelineStore;
@Inject
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
this.pipelineStore = pipelineStore;
}
@Override
protected void doExecute(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener) {
List<PipelineStore.PipelineReference> references = pipelineStore.getReference(request.ids());
Map<String, BytesReference> result = new HashMap<>();
Map<String, Long> versions = new HashMap<>();
for (PipelineStore.PipelineReference reference : references) {
result.put(reference.getPipeline().getId(), reference.getSource());
versions.put(reference.getPipeline().getId(), reference.getVersion());
}
listener.onResponse(new GetPipelineResponse(result, versions));
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class PutPipelineAction extends Action<PutPipelineRequest, PutPipelineResponse, PutPipelineRequestBuilder> {
public static final PutPipelineAction INSTANCE = new PutPipelineAction();
public static final String NAME = "cluster:admin/ingest/pipeline/put";
public PutPipelineAction() {
super(NAME);
}
@Override
public PutPipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new PutPipelineRequestBuilder(client, this);
}
@Override
public PutPipelineResponse newResponse() {
return new PutPipelineResponse();
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class PutPipelineRequest extends ActionRequest {
private String id;
private BytesReference source;
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
public String id() {
return id;
}
public void id(String id) {
this.id = id;
}
public BytesReference source() {
return source;
}
public void source(BytesReference source) {
this.source = source;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
source = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBytesReference(source);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineRequest, PutPipelineResponse, PutPipelineRequestBuilder> {
public PutPipelineRequestBuilder(ElasticsearchClient client, PutPipelineAction action) {
super(client, action, new PutPipelineRequest());
}
public PutPipelineRequestBuilder setId(String id) {
request.id(id);
return this;
}
public PutPipelineRequestBuilder setSource(BytesReference source) {
request.source(source);
return this;
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
public class PutPipelineResponse extends ActionResponse implements ToXContent {
private String id;
private long version;
public String id() {
return id;
}
public PutPipelineResponse id(String id) {
this.id = id;
return this;
}
public long version() {
return version;
}
public PutPipelineResponse version(long version) {
this.version = version;
return this;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeLong(version);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
version = in.readLong();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.ID, id);
builder.field(Fields.VERSION, version);
return builder;
}
static final class Fields {
static final XContentBuilderString ID = new XContentBuilderString("_id");
static final XContentBuilderString VERSION = new XContentBuilderString("_version");
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, PutPipelineResponse> {
private final TransportIndexAction indexAction;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction) {
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.indexAction = indexAction;
}
@Override
protected void doExecute(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(PipelineStore.INDEX);
indexRequest.type(PipelineStore.TYPE);
indexRequest.id(request.id());
indexRequest.source(request.source());
indexRequest.refresh(true);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
PutPipelineResponse response = new PutPipelineResponse();
response.id(indexResponse.getId());
response.version(indexResponse.getVersion());
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
}

View File

@ -21,6 +21,14 @@ package org.elasticsearch.ingest;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineResponse;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -29,20 +37,25 @@ import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.hamcrest.Matchers.*;
@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0)
public class BasicTests extends ESIntegTestCase {
public class IngestClientIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(IngestPlugin.class);
return pluginList(IngestPlugin.class);
}
public void test() throws Exception {
client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, "_id")
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
public void testBasics() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.field("name", "my_pipeline")
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
@ -54,10 +67,18 @@ public class BasicTests extends ESIntegTestCase {
.endObject()
.endObject()
.endArray()
.endObject())
.setRefresh(true)
.endObject().bytes())
.get();
Thread.sleep(5000);
assertBusy(new Runnable() {
@Override
public void run() {
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().get("_id"), notNullValue());
}
});
createIndex("test");
client().prepareIndex("test", "type", "1").setSource("field2", "abc")
@ -83,6 +104,23 @@ public class BasicTests extends ESIntegTestCase {
assertThat(doc.get("field3"), equalTo("xyz"));
}
});
DeletePipelineResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
.setId("_id")
.get();
assertThat(response.found(), is(true));
assertThat(response.id(), equalTo("_id"));
assertBusy(new Runnable() {
@Override
public void run() {
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response.isFound(), is(false));
assertThat(response.pipelines().get("_id"), nullValue());
}
});
}
@Override

View File

@ -25,10 +25,10 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import static org.hamcrest.Matchers.equalTo;
public class PipelineConfigDocReaderTests extends ESSingleNodeTestCase {
public class PipelineStoreClientTests extends ESSingleNodeTestCase {
public void testReadAll() {
PipelineConfigDocReader reader = new PipelineConfigDocReader(Settings.EMPTY, node().injector());
PipelineStoreClient reader = new PipelineStoreClient(Settings.EMPTY, node().injector());
reader.start();
createIndex(PipelineStore.INDEX);
@ -41,7 +41,7 @@ public class PipelineConfigDocReaderTests extends ESSingleNodeTestCase {
client().admin().indices().prepareRefresh().get();
int i = 0;
for (SearchHit hit : reader.readAll()) {
for (SearchHit hit : reader.readAllPipelines()) {
assertThat(hit.getId(), equalTo(Integer.toString(i)));
assertThat(hit.getVersion(), equalTo(1l));
assertThat(hit.getSource().get("field"), equalTo("value" + i));

View File

@ -33,11 +33,13 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -45,14 +47,14 @@ public class PipelineStoreTests extends ESTestCase {
private PipelineStore store;
private ThreadPool threadPool;
private PipelineConfigDocReader docReader;
private PipelineStoreClient client;
@Before
public void init() {
threadPool = new ThreadPool("test");
ClusterService clusterService = mock(ClusterService.class);
docReader = mock(PipelineConfigDocReader.class);
store = new PipelineStore(Settings.EMPTY, threadPool, clusterService, docReader, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory()));
client = mock(PipelineStoreClient.class);
store = new PipelineStore(Settings.EMPTY, threadPool, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory()));
store.start();
}
@ -66,52 +68,110 @@ public class PipelineStoreTests extends ESTestCase {
public void testUpdatePipeline() {
List<SearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}"))
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(docReader.readAll()).thenReturn(hits);
when(client.readAllPipelines()).thenReturn(hits);
when(client.existPipeline("1")).thenReturn(true);
assertThat(store.get("1"), nullValue());
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("_name1"));
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
when(client.existPipeline("2")).thenReturn(true);
hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}"))
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("_name1"));
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
assertThat(store.get("2").getId(), equalTo("_name2"));
assertThat(store.get("2").getId(), equalTo("2"));
assertThat(store.get("2").getDescription(), equalTo("_description2"));
hits.remove(1);
when(client.existPipeline("2")).thenReturn(false);
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
assertThat(store.get("2"), nullValue());
}
public void testPipelineUpdater() throws Exception {
List<SearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"name\": \"_name1\", \"description\": \"_description1\"}"))
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(docReader.readAll()).thenReturn(hits);
when(client.readAllPipelines()).thenReturn(hits);
when(client.existPipeline(anyString())).thenReturn(true);
assertThat(store.get("1"), nullValue());
store.startUpdateWorker();
assertBusy(() -> {
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("_name1"));
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
});
hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"name\": \"_name2\", \"description\": \"_description2\"}"))
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
assertBusy(() -> {
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("_name1"));
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
assertThat(store.get("2"), notNullValue());
assertThat(store.get("2").getId(), equalTo("_name2"));
assertThat(store.get("2").getId(), equalTo("2"));
assertThat(store.get("2").getDescription(), equalTo("_description2"));
});
}
public void testGetReference() {
// fill the store up for the test:
List<SearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "foo", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "bar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "foobar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
when(client.readAllPipelines()).thenReturn(hits);
store.updatePipelines();
List<PipelineStore.PipelineReference> result = store.getReference("foo");
assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
result = store.getReference("foo*");
// to make sure the order is consistent in the test:
Collections.sort(result, new Comparator<PipelineStore.PipelineReference>() {
@Override
public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
}
});
assertThat(result.size(), equalTo(2));
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
assertThat(result.get(1).getPipeline().getId(), equalTo("foobar"));
result = store.getReference("bar*");
assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getPipeline().getId(), equalTo("bar"));
result = store.getReference("*");
// to make sure the order is consistent in the test:
Collections.sort(result, new Comparator<PipelineStore.PipelineReference>() {
@Override
public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
}
});
assertThat(result.size(), equalTo(3));
assertThat(result.get(0).getPipeline().getId(), equalTo("bar"));
assertThat(result.get(1).getPipeline().getId(), equalTo("foo"));
assertThat(result.get(2).getPipeline().getId(), equalTo("foobar"));
result = store.getReference("foo", "bar");
assertThat(result.size(), equalTo(2));
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
assertThat(result.get(1).getPipeline().getId(), equalTo("bar"));
}
}

View File

@ -0,0 +1,20 @@
{
"ingest.delete_pipeline": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "DELETE" ],
"url": {
"path": "/_ingest/pipeline/{id}",
"paths": [ "/_ingest/pipeline/{id}" ],
"parts": {
"id": {
"type" : "string",
"description" : "Pipeline ID",
"required" : true
}
},
"params": {
}
},
"body": null
}
}

View File

@ -0,0 +1,20 @@
{
"ingest.get_pipeline": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "GET" ],
"url": {
"path": "/_ingest/pipeline/{ids}",
"paths": [ "/_ingest/pipeline/{ids}" ],
"parts": {
"ids": {
"type" : "string",
"description" : "Comma separated list of pipeline ids. Wildcards supported",
"required" : true
}
},
"params": {
}
},
"body": null
}
}

View File

@ -0,0 +1,23 @@
{
"ingest.put_pipeline": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "PUT" ],
"url": {
"path": "/_ingest/pipeline/{id}",
"paths": [ "/_ingest/pipeline/{id}" ],
"parts": {
"id": {
"type" : "string",
"description" : "Pipeline ID",
"required" : true
}
},
"params": {
}
},
"body": {
"description" : "The ingest definition",
"required" : true
}
}
}

View File

@ -0,0 +1,57 @@
---
"Test basic pipeline crud":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"simple" : {
"path" : "field1",
"value" : "_value",
"add_field" : "field2",
"add_field_value" : "_value"
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.get_pipeline:
ids: "my_pipeline"
- match: { my_pipeline._source.description: "_description" }
- match: { my_pipeline._version: 1 }
- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { _id: "my_pipeline" }
- match: { _found: true }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
catch: missing
ingest.get_pipeline:
ids: "my_pipeline"