diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 4a35bcbcfb0..98c992cf2bc 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -610,7 +610,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte RETRY_ON_REPLICA_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException.class, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException::new, 136), TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, org.elasticsearch.indices.TypeMissingException::new, 137), FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140), - QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141); + QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141), + PIPELINE_MISSING_EXCEPTION(org.elasticsearch.ingest.PipelineMissingException.class, org.elasticsearch.ingest.PipelineMissingException::new, 142); final Class exceptionClass; final FunctionThatThrowsIOException constructor; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineAction.java b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineAction.java index 8456d7e0e6a..ba1dd5d385f 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineAction.java @@ -20,10 +20,9 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.Action; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.ElasticsearchClient; -public class DeletePipelineAction extends Action { +public class DeletePipelineAction extends Action { public static final DeletePipelineAction INSTANCE = new DeletePipelineAction(); public static final String NAME = "cluster:admin/ingest/pipeline/delete"; @@ -38,7 +37,7 @@ public class DeletePipelineAction extends Action { +public class DeletePipelineRequestBuilder extends ActionRequestBuilder { public DeletePipelineRequestBuilder(ElasticsearchClient client, DeletePipelineAction action) { super(client, action, new DeletePipelineRequest()); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java index 4f25a9d330c..03f63ff26b7 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java @@ -20,9 +20,12 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -31,18 +34,36 @@ import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -public class DeletePipelineTransportAction extends HandledTransportAction { +public class DeletePipelineTransportAction extends TransportMasterNodeAction { private final PipelineStore pipelineStore; @Inject - public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { - super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); + public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { + super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); this.pipelineStore = bootstrapper.getPipelineStore(); } @Override - protected void doExecute(DeletePipelineRequest request, ActionListener listener) { + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected WritePipelineResponse newResponse() { + return new WritePipelineResponse(); + } + + @Override + protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener listener) throws Exception { pipelineStore.delete(request, listener); } + + @Override + protected ClusterBlockException checkBlock(DeletePipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java index e0bfca6cac4..afde709d699 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java @@ -19,8 +19,8 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +28,7 @@ import java.io.IOException; import static org.elasticsearch.action.ValidateActions.addValidationError; -public class GetPipelineRequest extends ActionRequest { +public class GetPipelineRequest extends MasterNodeReadRequest { private String[] ids; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java index c339603104e..21fa974cd3c 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java @@ -19,10 +19,10 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; -public class GetPipelineRequestBuilder extends ActionRequestBuilder { +public class GetPipelineRequestBuilder extends MasterNodeReadOperationRequestBuilder { public GetPipelineRequestBuilder(ElasticsearchClient client, GetPipelineAction action) { super(client, action, new GetPipelineRequest()); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java index 9a12f4b1d03..9f0b229d322 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.ingest.PipelineDefinition; +import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -33,16 +33,16 @@ import java.util.List; public class GetPipelineResponse extends ActionResponse implements StatusToXContent { - private List pipelines; + private List pipelines; public GetPipelineResponse() { } - public GetPipelineResponse(List pipelines) { + public GetPipelineResponse(List pipelines) { this.pipelines = pipelines; } - public List pipelines() { + public List pipelines() { return pipelines; } @@ -52,7 +52,7 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont int size = in.readVInt(); pipelines = new ArrayList<>(size); for (int i = 0; i < size; i++) { - pipelines.add(PipelineDefinition.readPipelineDefinitionFrom(in)); + pipelines.add(PipelineConfiguration.readPipelineConfiguration(in)); } } @@ -60,7 +60,7 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVInt(pipelines.size()); - for (PipelineDefinition pipeline : pipelines) { + for (PipelineConfiguration pipeline : pipelines) { pipeline.writeTo(out); } } @@ -76,9 +76,11 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - for (PipelineDefinition definition : pipelines) { - definition.toXContent(builder, params); + builder.startArray("pipelines"); + for (PipelineConfiguration pipeline : pipelines) { + pipeline.toXContent(builder, params); } + builder.endArray(); return builder; } } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java index 471238e0587..cb962550255 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java @@ -21,31 +21,49 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestBootstrapper; -import org.elasticsearch.ingest.PipelineDefinition; import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.List; - -public class GetPipelineTransportAction extends HandledTransportAction { +public class GetPipelineTransportAction extends TransportMasterNodeReadAction { private final PipelineStore pipelineStore; @Inject - public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { - super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); + public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { + super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); this.pipelineStore = bootstrapper.getPipelineStore(); } @Override - protected void doExecute(GetPipelineRequest request, ActionListener listener) { - List references = pipelineStore.getReference(request.ids()); - listener.onResponse(new GetPipelineResponse(references)); + protected String executor() { + return ThreadPool.Names.SAME; } + + @Override + protected GetPipelineResponse newResponse() { + return new GetPipelineResponse(); + } + + @Override + protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { + listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(request.ids()))); + } + + @Override + protected ClusterBlockException checkBlock(GetPipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineAction.java index 7f37009577e..8f4b4170f51 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineAction.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.ElasticsearchClient; -public class PutPipelineAction extends Action { +public class PutPipelineAction extends Action { public static final PutPipelineAction INSTANCE = new PutPipelineAction(); public static final String NAME = "cluster:admin/ingest/pipeline/put"; @@ -38,7 +38,7 @@ public class PutPipelineAction extends Action { +public class PutPipelineRequestBuilder extends ActionRequestBuilder { public PutPipelineRequestBuilder(ElasticsearchClient client, PutPipelineAction action) { super(client, action, new PutPipelineRequest()); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 8f7da7eff07..3b9e738f69b 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -23,6 +23,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -31,18 +36,36 @@ import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -public class PutPipelineTransportAction extends HandledTransportAction { +public class PutPipelineTransportAction extends TransportMasterNodeAction { private final PipelineStore pipelineStore; @Inject - public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { - super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); + public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { + super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); this.pipelineStore = bootstrapper.getPipelineStore(); } @Override - protected void doExecute(PutPipelineRequest request, ActionListener listener) { + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected WritePipelineResponse newResponse() { + return new WritePipelineResponse(); + } + + @Override + protected void masterOperation(PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { pipelineStore.put(request, listener); } + + @Override + protected ClusterBlockException checkBlock(PutPipelineRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/ReloadPipelinesAction.java b/core/src/main/java/org/elasticsearch/action/ingest/ReloadPipelinesAction.java deleted file mode 100644 index 452f3a3341f..00000000000 --- a/core/src/main/java/org/elasticsearch/action/ingest/ReloadPipelinesAction.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -/** - * An internal api that refreshes the in-memory representation of all the pipelines on all ingest nodes. - */ -public class ReloadPipelinesAction extends AbstractComponent implements TransportRequestHandler { - - public static final String ACTION_NAME = "internal:admin/ingest/reload/pipelines"; - - private final ClusterService clusterService; - private final TransportService transportService; - private final PipelineStore pipelineStore; - - public ReloadPipelinesAction(Settings settings, PipelineStore pipelineStore, ClusterService clusterService, TransportService transportService) { - super(settings); - this.pipelineStore = pipelineStore; - this.clusterService = clusterService; - this.transportService = transportService; - transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.MANAGEMENT, this); - } - - public void reloadPipelinesOnAllNodes(Consumer listener) { - AtomicBoolean failed = new AtomicBoolean(); - DiscoveryNodes nodes = clusterService.state().getNodes(); - AtomicInteger expectedResponses = new AtomicInteger(nodes.size()); - for (DiscoveryNode node : nodes) { - ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest(); - transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler() { - @Override - public ReloadPipelinesResponse newInstance() { - return new ReloadPipelinesResponse(); - } - - @Override - public void handleResponse(ReloadPipelinesResponse response) { - decrementAndReturn(); - } - - @Override - public void handleException(TransportException exp) { - logger.warn("failed to update pipelines on remote node [{}]", exp, node); - failed.set(true); - decrementAndReturn(); - } - - void decrementAndReturn() { - if (expectedResponses.decrementAndGet() == 0) { - listener.accept(!failed.get()); - } - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - } - - @Override - public void messageReceived(ReloadPipelinesRequest request, TransportChannel channel) throws Exception { - try { - pipelineStore.updatePipelines(); - channel.sendResponse(new ReloadPipelinesResponse()); - } catch (Throwable e) { - logger.warn("failed to update pipelines", e); - channel.sendResponse(e); - } - } - - final static class ReloadPipelinesRequest extends TransportRequest { - - } - - final static class ReloadPipelinesResponse extends TransportResponse { - - } - -} diff --git a/core/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java b/core/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java new file mode 100644 index 00000000000..885fd9f35d6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class WritePipelineResponse extends AcknowledgedResponse { + + WritePipelineResponse() { + } + + public WritePipelineResponse(boolean acknowledge) { + super(acknowledge); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + } +} diff --git a/core/src/main/java/org/elasticsearch/client/Client.java b/core/src/main/java/org/elasticsearch/client/Client.java index 2d7e8bde0a6..984f79308a8 100644 --- a/core/src/main/java/org/elasticsearch/client/Client.java +++ b/core/src/main/java/org/elasticsearch/client/Client.java @@ -61,6 +61,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequestBuilder; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; import org.elasticsearch.action.ingest.SimulatePipelineResponse; +import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.action.percolate.MultiPercolateRequest; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; import org.elasticsearch.action.percolate.MultiPercolateResponse; @@ -605,12 +606,12 @@ public interface Client extends ElasticsearchClient, Releasable { /** * Stores an ingest pipeline */ - void putPipeline(PutPipelineRequest request, ActionListener listener); + void putPipeline(PutPipelineRequest request, ActionListener listener); /** * Stores an ingest pipeline */ - ActionFuture putPipeline(PutPipelineRequest request); + ActionFuture putPipeline(PutPipelineRequest request); /** * Stores an ingest pipeline @@ -620,12 +621,12 @@ public interface Client extends ElasticsearchClient, Releasable { /** * Deletes a stored ingest pipeline */ - void deletePipeline(DeletePipelineRequest request, ActionListener listener); + void deletePipeline(DeletePipelineRequest request, ActionListener listener); /** * Deletes a stored ingest pipeline */ - ActionFuture deletePipeline(DeletePipelineRequest request); + ActionFuture deletePipeline(DeletePipelineRequest request); /** * Deletes a stored ingest pipeline diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index bc5ed9410d1..69806aeab84 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -286,6 +286,7 @@ import org.elasticsearch.action.ingest.SimulatePipelineAction; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; import org.elasticsearch.action.ingest.SimulatePipelineResponse; +import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.action.percolate.MultiPercolateAction; import org.elasticsearch.action.percolate.MultiPercolateRequest; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; @@ -806,12 +807,12 @@ public abstract class AbstractClient extends AbstractComponent implements Client } @Override - public void putPipeline(PutPipelineRequest request, ActionListener listener) { + public void putPipeline(PutPipelineRequest request, ActionListener listener) { execute(PutPipelineAction.INSTANCE, request, listener); } @Override - public ActionFuture putPipeline(PutPipelineRequest request) { + public ActionFuture putPipeline(PutPipelineRequest request) { return execute(PutPipelineAction.INSTANCE, request); } @@ -821,12 +822,12 @@ public abstract class AbstractClient extends AbstractComponent implements Client } @Override - public void deletePipeline(DeletePipelineRequest request, ActionListener listener) { + public void deletePipeline(DeletePipelineRequest request, ActionListener listener) { execute(DeletePipelineAction.INSTANCE, request, listener); } @Override - public ActionFuture deletePipeline(DeletePipelineRequest request) { + public ActionFuture deletePipeline(DeletePipelineRequest request) { return execute(DeletePipelineAction.INSTANCE, request); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 002d1a51107..0e41dda1888 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.ttl.IndicesTTLService; +import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -111,6 +112,7 @@ public class MetaData implements Iterable, Diffable, Fr static { // register non plugin custom metadata registerPrototype(RepositoriesMetaData.TYPE, RepositoriesMetaData.PROTO); + registerPrototype(IngestMetadata.TYPE, IngestMetadata.PROTO); } /** diff --git a/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java b/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java deleted file mode 100644 index 18535d1626e..00000000000 --- a/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; - -import java.util.Collections; -import java.util.Iterator; - -/** - * An iterator that easily helps to consume all hits from a scroll search. - */ -public final class SearchScrollIterator implements Iterator { - - /** - * Creates an iterator that returns all matching hits of a scroll search via an iterator. - * The iterator will return all hits per scroll search and execute additional scroll searches - * to get more hits until all hits have been returned by the scroll search on the ES side. - */ - public static Iterable createIterator(Client client, TimeValue scrollTimeout, SearchRequest searchRequest) { - searchRequest.scroll(scrollTimeout); - SearchResponse searchResponse = client.search(searchRequest).actionGet(scrollTimeout); - if (searchResponse.getHits().getTotalHits() == 0) { - return Collections.emptyList(); - } else { - return () -> new SearchScrollIterator(client, scrollTimeout, searchResponse); - } - } - - private final Client client; - private final TimeValue scrollTimeout; - - private int currentIndex; - private SearchHit[] currentHits; - private SearchResponse searchResponse; - - private SearchScrollIterator(Client client, TimeValue scrollTimeout, SearchResponse searchResponse) { - this.client = client; - this.scrollTimeout = scrollTimeout; - this.searchResponse = searchResponse; - this.currentHits = searchResponse.getHits().getHits(); - } - - @Override - public boolean hasNext() { - if (currentIndex < currentHits.length) { - return true; - } else { - if (searchResponse == null) { - return false; - } - - SearchScrollRequest request = new SearchScrollRequest(searchResponse.getScrollId()); - request.scroll(scrollTimeout); - searchResponse = client.searchScroll(request).actionGet(scrollTimeout); - if (searchResponse.getHits().getHits().length == 0) { - searchResponse = null; - return false; - } else { - currentHits = searchResponse.getHits().getHits(); - currentIndex = 0; - return true; - } - } - } - - @Override - public SearchHit next() { - return currentHits[currentIndex++]; - } -} diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java b/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java index e824786bd82..cf145821859 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java @@ -48,11 +48,8 @@ import java.io.InputStream; * Instantiates and wires all the services that the ingest plugin will be needing. * Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state. */ -public class IngestBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener { +public class IngestBootstrapper extends AbstractLifecycleComponent { - static final String INGEST_INDEX_TEMPLATE_NAME = "ingest-template"; - - private final ThreadPool threadPool; private final Environment environment; private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; @@ -64,31 +61,12 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl // pipelines into NodeModule? I'd really like to prevent adding yet another module. @Inject public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment, - ClusterService clusterService, TransportService transportService, - ProcessorsRegistry processorsRegistry) { + ClusterService clusterService, ProcessorsRegistry processorsRegistry) { super(settings); - this.threadPool = threadPool; this.environment = environment; this.processorsRegistry = processorsRegistry; - this.pipelineStore = new PipelineStore(settings, clusterService, transportService); + this.pipelineStore = new PipelineStore(settings, clusterService); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); - - boolean isNoTribeNode = settings.getByPrefix("tribe.").getAsMap().isEmpty(); - if (isNoTribeNode) { - clusterService.add(this); - } - } - - // for testing: - IngestBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService, - PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) { - super(settings); - this.threadPool = threadPool; - this.environment = null; - clusterService.add(this); - this.pipelineStore = pipelineStore; - this.pipelineExecutionService = pipelineExecutionService; - this.processorsRegistry = null; } public PipelineStore getPipelineStore() { @@ -99,49 +77,11 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl return pipelineExecutionService; } - @Inject - public void setClient(Client client) { - pipelineStore.setClient(client); - } - @Inject public void setScriptService(ScriptService scriptService) { pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); } - @Override - public void clusterChanged(ClusterChangedEvent event) { - ClusterState state = event.state(); - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - return; - } - - if (pipelineStore.isStarted()) { - if (validClusterState(state) == false) { - stopPipelineStore("cluster state invalid [" + state + "]"); - } - } else { - if (validClusterState(state)) { - startPipelineStore(state.metaData()); - } - } - } - - boolean validClusterState(ClusterState state) { - if (state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES) || - state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL)) { - return false; - } - - if (state.getMetaData().hasConcreteIndex(PipelineStore.INDEX)) { - IndexRoutingTable routingTable = state.getRoutingTable().index(PipelineStore.INDEX); - return routingTable.allPrimaryShardsActive(); - } else { - // it will be ready when auto create index kicks in before the first pipeline doc gets added - return true; - } - } - @Override protected void doStart() { } @@ -159,33 +99,4 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl } } - void startPipelineStore(MetaData metaData) { - try { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - try { - pipelineStore.start(); - } catch (Exception e1) { - logger.warn("pipeline store failed to start, retrying...", e1); - startPipelineStore(metaData); - } - }); - } catch (EsRejectedExecutionException e) { - logger.debug("async pipeline store start failed", e); - } - } - - void stopPipelineStore(String reason) { - try { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - try { - pipelineStore.stop(reason); - } catch (Exception e) { - logger.error("pipeline store stop failure", e); - } - }); - } catch (EsRejectedExecutionException e) { - logger.debug("async pipeline store stop failed", e); - } - } - } diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java b/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java new file mode 100644 index 00000000000..4ee41c58ad7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * Holds the ingest pipelines that are available in the cluster + */ +public final class IngestMetadata extends AbstractDiffable implements MetaData.Custom { + + public final static String TYPE = "ingest"; + public final static IngestMetadata PROTO = new IngestMetadata(); + + // We can't use Pipeline class directly in cluster state, because we don't have the processor factories around when + // IngestMetadata is registered as custom metadata. + private final Map pipelines; + + private IngestMetadata() { + this.pipelines = Collections.emptyMap(); + } + + public IngestMetadata(Map pipelines) { + this.pipelines = Collections.unmodifiableMap(pipelines); + } + + @Override + public String type() { + return TYPE; + } + + public Map getPipelines() { + return pipelines; + } + + @Override + public MetaData.Custom readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + Map pipelines = new HashMap<>(size); + for (int i = 0; i < size; i++) { + PipelineConfiguration pipeline = PipelineConfiguration.readPipelineConfiguration(in); + pipelines.put(pipeline.getId(), pipeline); + } + return new IngestMetadata(pipelines); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(pipelines.size()); + for (PipelineConfiguration pipeline : pipelines.values()) { + pipeline.writeTo(out); + } + } + + @Override + public MetaData.Custom fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case START_ARRAY: + if ("pipelines".equals(currentFieldName)) { + Map pipelines = new HashMap<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + PipelineConfiguration pipeline = new PipelineConfiguration(parser); + pipelines.put(pipeline.getId(), pipeline); + } + } + return new IngestMetadata(pipelines); + } + break; + } + } + return PROTO; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("pipelines"); + for (PipelineConfiguration pipeline : pipelines.values()) { + pipeline.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public EnumSet context() { + return MetaData.API_AND_GATEWAY; + } + +} diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java new file mode 100644 index 00000000000..da0ff4c3e2a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -0,0 +1,112 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Encapsulates a pipeline's id and configuration as a blob + */ +public final class PipelineConfiguration implements Writeable, ToXContent { + + private final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null); + + public static PipelineConfiguration readPipelineConfiguration(StreamInput in) throws IOException { + return PROTOTYPE.readFrom(in); + } + + private final String id; + // Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state + // and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options) + // also the get pipeline api just directly returns this to the caller + private final BytesReference config; + + PipelineConfiguration(XContentParser parser) throws IOException { + String id = null; + BytesReference config = null; + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case VALUE_STRING: + if ("id".equals(currentFieldName)) { + id = parser.text(); + } + break; + case START_OBJECT: + XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentEvent(builder.generator(), parser); + config = builder.bytes(); + break; + } + } + + this.id = Objects.requireNonNull(id); + this.config = Objects.requireNonNull(config); + } + + public PipelineConfiguration(String id, BytesReference config) { + this.id = id; + this.config = config; + } + + public String getId() { + return id; + } + + public Map getConfigAsMap() { + return XContentHelper.convertToMap(config, true).v2(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("id", id); + builder.field("config", getConfigAsMap()); + builder.endObject(); + return builder; + } + + @Override + public PipelineConfiguration readFrom(StreamInput in) throws IOException { + return new PipelineConfiguration(in.readString(), in.readBytesReference()); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeBytesReference(config); + } + +} diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineDefinition.java b/core/src/main/java/org/elasticsearch/ingest/PipelineDefinition.java deleted file mode 100644 index 94c584ad121..00000000000 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineDefinition.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.ingest.core.Pipeline; - -import java.io.IOException; - -public class PipelineDefinition implements Writeable, ToXContent { - - private static final PipelineDefinition PROTOTYPE = new PipelineDefinition((String) null, -1, null); - - public static PipelineDefinition readPipelineDefinitionFrom(StreamInput in) throws IOException { - return PROTOTYPE.readFrom(in); - } - - private final String id; - private final long version; - private final BytesReference source; - - private final Pipeline pipeline; - - PipelineDefinition(Pipeline pipeline, long version, BytesReference source) { - this.id = pipeline.getId(); - this.version = version; - this.source = source; - this.pipeline = pipeline; - } - - PipelineDefinition(String id, long version, BytesReference source) { - this.id = id; - this.version = version; - this.source = source; - this.pipeline = null; - } - - public String getId() { - return id; - } - - public long getVersion() { - return version; - } - - public BytesReference getSource() { - return source; - } - - Pipeline getPipeline() { - return pipeline; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - PipelineDefinition holder = (PipelineDefinition) o; - return source.equals(holder.source); - } - - @Override - public int hashCode() { - return source.hashCode(); - } - - @Override - public PipelineDefinition readFrom(StreamInput in) throws IOException { - String id = in.readString(); - long version = in.readLong(); - BytesReference source = in.readBytesReference(); - return new PipelineDefinition(id, version, source); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - out.writeLong(version); - out.writeBytesReference(source); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(id); - XContentHelper.writeRawField("_source", source, builder, params); - builder.field("_version", version); - builder.endObject(); - return builder; - } -} diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineMissingException.java b/core/src/main/java/org/elasticsearch/ingest/PipelineMissingException.java new file mode 100644 index 00000000000..82637ae2ded --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineMissingException.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +public class PipelineMissingException extends ElasticsearchException { + + public PipelineMissingException(String id) { + super("pipeline [{}] is missing", id); + } + + public PipelineMissingException(StreamInput in) throws IOException { + super(in); + } + + @Override + public RestStatus status() { + return RestStatus.NOT_FOUND; + } +} diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 785eb5829bc..ab11f99246e 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -21,43 +21,24 @@ package org.elasticsearch.ingest; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.Client; +import org.elasticsearch.action.ingest.WritePipelineResponse; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.SearchScrollIterator; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.ReloadPipelinesAction; import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; @@ -68,66 +49,22 @@ import java.util.List; import java.util.Map; import java.util.function.BiFunction; -public class PipelineStore extends AbstractComponent implements Closeable { +public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener { - public final static String INDEX = ".ingest"; - public final static String TYPE = "pipeline"; - - final static Settings INGEST_INDEX_SETTING = Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.mapper.dynamic", false) - .build(); - - final static String PIPELINE_MAPPING; - - static { - try { - PIPELINE_MAPPING = XContentFactory.jsonBuilder().startObject() - .field("dynamic", "strict") - .startObject("_all") - .field("enabled", false) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private Client client; - private final TimeValue scrollTimeout; private final ClusterService clusterService; - private final ReloadPipelinesAction reloadPipelinesAction; private final Pipeline.Factory factory = new Pipeline.Factory(); private Map processorFactoryRegistry; - private volatile boolean started = false; - private volatile Map pipelines = new HashMap<>(); + // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. + // We know of all the processor factories when a node with all its plugin have been initialized. Also some + // processor factories rely on other node services. Custom metadata is statically registered when classes + // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. + volatile Map pipelines = new HashMap<>(); - public PipelineStore(Settings settings, ClusterService clusterService, TransportService transportService) { + public PipelineStore(Settings settings, ClusterService clusterService) { super(settings); this.clusterService = clusterService; - this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); - this.reloadPipelinesAction = new ReloadPipelinesAction(settings, this, clusterService, transportService); - } - - public void setClient(Client client) { - this.client = client; + clusterService.add(this); } public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) { @@ -142,7 +79,6 @@ public class PipelineStore extends AbstractComponent implements Closeable { @Override public void close() throws IOException { - stop("closing"); // TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code, // since any wired closable should be able to close itself List closeables = new ArrayList<>(); @@ -154,18 +90,63 @@ public class PipelineStore extends AbstractComponent implements Closeable { IOUtils.close(closeables); } + @Override + public void clusterChanged(ClusterChangedEvent event) { + innerUpdatePipelines(event.state()); + } + + void innerUpdatePipelines(ClusterState state) { + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + if (ingestMetadata == null) { + return; + } + + Map pipelines = new HashMap<>(); + for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + pipelines.put(pipeline.getId(), constructPipeline(pipeline.getId(), pipeline.getConfigAsMap())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + this.pipelines = Collections.unmodifiableMap(pipelines); + } + /** * Deletes the pipeline specified by id in the request. */ - public void delete(DeletePipelineRequest request, ActionListener listener) { - ensureReady(); + public void delete(DeletePipelineRequest request, ActionListener listener) { + clusterService.submitStateUpdateTask("delete-pipeline-" + request.id(), new AckedClusterStateUpdateTask(request, listener) { - DeleteRequest deleteRequest = new DeleteRequest(request); - deleteRequest.index(PipelineStore.INDEX); - deleteRequest.type(PipelineStore.TYPE); - deleteRequest.id(request.id()); - deleteRequest.refresh(true); - client.delete(deleteRequest, handleWriteResponseAndReloadPipelines(listener)); + @Override + protected WritePipelineResponse newResponse(boolean acknowledged) { + return new WritePipelineResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerDelete(request, currentState); + } + }); + } + + ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + if (currentIngestMetadata == null) { + return currentState; + } + Map pipelines = currentIngestMetadata.getPipelines(); + if (pipelines.containsKey(request.id()) == false) { + throw new PipelineMissingException(request.id()); + } else { + pipelines = new HashMap<>(pipelines); + pipelines.remove(request.id()); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) + .build()); + return newState.build(); + } } /** @@ -173,280 +154,92 @@ public class PipelineStore extends AbstractComponent implements Closeable { * * @throws IllegalArgumentException If the pipeline holds incorrect configuration */ - public void put(PutPipelineRequest request, ActionListener listener) throws IllegalArgumentException { - ensureReady(); - + public void put(PutPipelineRequest request, ActionListener listener) throws IllegalArgumentException { try { - // validates the pipeline and processor configuration: + // validates the pipeline and processor configuration before submitting a cluster update task: Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); constructPipeline(request.id(), pipelineConfig); } catch (Exception e) { throw new IllegalArgumentException("Invalid pipeline configuration", e); } + clusterService.submitStateUpdateTask("put-pipeline-" + request.id(), new AckedClusterStateUpdateTask(request, listener) { - ClusterState state = clusterService.state(); - if (isIngestIndexPresent(state)) { - innerPut(request, listener); - } else { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX); - createIndexRequest.settings(INGEST_INDEX_SETTING); - createIndexRequest.mapping(TYPE, PIPELINE_MAPPING); - client.admin().indices().create(createIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { - innerPut(request, listener); - } + @Override + protected WritePipelineResponse newResponse(boolean acknowledged) { + return new WritePipelineResponse(acknowledged); + } - @Override - public void onFailure(Throwable e) { - listener.onFailure(e); - } - }); - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerPut(request, currentState); + } + }); } - private void innerPut(PutPipelineRequest request, ActionListener listener) { - IndexRequest indexRequest = new IndexRequest(request); - indexRequest.index(PipelineStore.INDEX); - indexRequest.type(PipelineStore.TYPE); - indexRequest.id(request.id()); - indexRequest.source(request.source()); - indexRequest.refresh(true); - client.index(indexRequest, handleWriteResponseAndReloadPipelines(listener)); + ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + Map pipelines; + if (currentIngestMetadata != null) { + pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); + } else { + pipelines = new HashMap<>(); + } + + pipelines.put(request.id(), new PipelineConfiguration(request.id(), request.source())); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) + .build()); + return newState.build(); } /** * Returns the pipeline by the specified id */ public Pipeline get(String id) { - ensureReady(); - - PipelineDefinition ref = pipelines.get(id); - if (ref != null) { - return ref.getPipeline(); - } else { - return null; - } + return pipelines.get(id); } public Map getProcessorFactoryRegistry() { return processorFactoryRegistry; } - public List getReference(String... ids) { - ensureReady(); + /** + * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines + * may be returned + */ + // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't + // know how to serialize themselves. + public List getPipelines(String... ids) { + IngestMetadata ingestMetadata = clusterService.state().getMetaData().custom(IngestMetadata.TYPE); + return innerGetPipelines(ingestMetadata, ids); + } - List result = new ArrayList<>(ids.length); + List innerGetPipelines(IngestMetadata ingestMetadata, String... ids) { + if (ingestMetadata == null) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(ids.length); for (String id : ids) { if (Regex.isSimpleMatchPattern(id)) { - for (Map.Entry entry : pipelines.entrySet()) { + for (Map.Entry entry : ingestMetadata.getPipelines().entrySet()) { if (Regex.simpleMatch(id, entry.getKey())) { result.add(entry.getValue()); } } } else { - PipelineDefinition reference = pipelines.get(id); - if (reference != null) { - result.add(reference); + PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id); + if (pipeline != null) { + result.add(pipeline); } } } return result; } - public synchronized void updatePipelines() throws Exception { - // note: this process isn't fast or smart, but the idea is that there will not be many pipelines, - // so for that reason the goal is to keep the update logic simple. - - int changed = 0; - Map newPipelines = new HashMap<>(pipelines); - for (SearchHit hit : readAllPipelines()) { - String pipelineId = hit.getId(); - BytesReference pipelineSource = hit.getSourceRef(); - PipelineDefinition current = newPipelines.get(pipelineId); - if (current != null) { - // If we first read from a primary shard copy and then from a replica copy, - // and a write did not yet make it into the replica shard - // then the source is not equal but we don't update because the current pipeline is the latest: - if (current.getVersion() > hit.getVersion()) { - continue; - } - if (current.getSource().equals(pipelineSource)) { - continue; - } - } - - changed++; - Pipeline pipeline = constructPipeline(hit.getId(), hit.sourceAsMap()); - newPipelines.put(pipelineId, new PipelineDefinition(pipeline, hit.getVersion(), pipelineSource)); - } - - int removed = 0; - for (String existingPipelineId : pipelines.keySet()) { - if (pipelineExists(existingPipelineId) == false) { - newPipelines.remove(existingPipelineId); - removed++; - } - } - - if (changed != 0 || removed != 0) { - logger.debug("adding or updating [{}] pipelines and [{}] pipelines removed", changed, removed); - pipelines = newPipelines; - } else { - logger.debug("no pipelines changes detected"); - } - } - private Pipeline constructPipeline(String id, Map config) throws Exception { return factory.create(id, config, processorFactoryRegistry); } - boolean pipelineExists(String pipelineId) { - GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId); - try { - GetResponse response = client.get(request).actionGet(); - return response.isExists(); - } catch (IndexNotFoundException e) { - // the ingest index doesn't exist, so the pipeline doesn't either: - return false; - } - } - - /** - * @param clusterState The cluster just to check whether the ingest index exists and the state of the ingest index - * @throws IllegalStateException If the ingest template exists, but is in an invalid state - * @return true when the ingest index exists and has the expected settings and mappings or returns - * false when the ingest index doesn't exists and needs to be created. - */ - boolean isIngestIndexPresent(ClusterState clusterState) throws IllegalStateException { - if (clusterState.getMetaData().hasIndex(INDEX)) { - IndexMetaData indexMetaData = clusterState.getMetaData().index(INDEX); - Settings indexSettings = indexMetaData.getSettings(); - int numberOfShards = indexSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1); - if (numberOfShards != 1) { - throw new IllegalStateException("illegal ingest index setting, [" + IndexMetaData.SETTING_NUMBER_OF_SHARDS + "] setting is [" + numberOfShards + "] while [1] is expected"); - } - int numberOfReplicas = indexSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1); - if (numberOfReplicas != 1) { - throw new IllegalStateException("illegal ingest index setting, [" + IndexMetaData.SETTING_NUMBER_OF_REPLICAS + "] setting is [" + numberOfReplicas + "] while [1] is expected"); - } - boolean dynamicMappings = indexSettings.getAsBoolean("index.mapper.dynamic", true); - if (dynamicMappings != false) { - throw new IllegalStateException("illegal ingest index setting, [index.mapper.dynamic] setting is [" + dynamicMappings + "] while [false] is expected"); - } - - if (indexMetaData.getMappings().size() != 1 && indexMetaData.getMappings().containsKey(TYPE) == false) { - throw new IllegalStateException("illegal ingest mappings, only [" + TYPE + "] mapping is allowed to exist in the " + INDEX +" index"); - } - - try { - Map pipelineMapping = indexMetaData.getMappings().get(TYPE).getSourceAsMap(); - String dynamicMapping = (String) XContentMapValues.extractValue("dynamic", pipelineMapping); - if ("strict".equals(dynamicMapping) == false) { - throw new IllegalStateException("illegal ingest mapping, pipeline mapping must be strict"); - } - Boolean allEnabled = (Boolean) XContentMapValues.extractValue("_all.enabled", pipelineMapping); - if (Boolean.FALSE.equals(allEnabled) == false) { - throw new IllegalStateException("illegal ingest mapping, _all field is enabled"); - } - - String processorsType = (String) XContentMapValues.extractValue("properties.processors.type", pipelineMapping); - if ("object".equals(processorsType) == false) { - throw new IllegalStateException("illegal ingest mapping, processors field's type is [" + processorsType + "] while [object] is expected"); - } - - Boolean processorsEnabled = (Boolean) XContentMapValues.extractValue("properties.processors.enabled", pipelineMapping); - if (Boolean.FALSE.equals(processorsEnabled) == false) { - throw new IllegalStateException("illegal ingest mapping, processors field enabled option is [true] while [false] is expected"); - } - - String processorsDynamic = (String) XContentMapValues.extractValue("properties.processors.dynamic", pipelineMapping); - if ("true".equals(processorsDynamic) == false) { - throw new IllegalStateException("illegal ingest mapping, processors field dynamic option is [false] while [true] is expected"); - } - - String onFailureType = (String) XContentMapValues.extractValue("properties.on_failure.type", pipelineMapping); - if ("object".equals(onFailureType) == false) { - throw new IllegalStateException("illegal ingest mapping, on_failure field type option is [" + onFailureType + "] while [object] is expected"); - } - - Boolean onFailureEnabled = (Boolean) XContentMapValues.extractValue("properties.on_failure.enabled", pipelineMapping); - if (Boolean.FALSE.equals(onFailureEnabled) == false) { - throw new IllegalStateException("illegal ingest mapping, on_failure field enabled option is [true] while [false] is expected"); - } - - String onFailureDynamic = (String) XContentMapValues.extractValue("properties.on_failure.dynamic", pipelineMapping); - if ("true".equals(onFailureDynamic) == false) { - throw new IllegalStateException("illegal ingest mapping, on_failure field dynamic option is [false] while [true] is expected"); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return true; - } else { - return false; - } - } - - - synchronized void start() throws Exception { - if (started) { - logger.debug("Pipeline already started"); - } else { - updatePipelines(); - started = true; - logger.debug("Pipeline store started with [{}] pipelines", pipelines.size()); - } - } - - synchronized void stop(String reason) { - if (started) { - started = false; - pipelines = new HashMap<>(); - logger.debug("Pipeline store stopped, reason [{}]", reason); - } else { - logger.debug("Pipeline alreadt stopped"); - } - } - - public boolean isStarted() { - return started; - } - - private Iterable readAllPipelines() { - // TODO: the search should be replaced with an ingest API when it is available - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.version(true); - sourceBuilder.sort("_doc", SortOrder.ASC); - SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX); - searchRequest.source(sourceBuilder); - searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest); - } - - private void ensureReady() { - if (started == false) { - throw new IllegalStateException("pipeline store isn't ready yet"); - } - } - - @SuppressWarnings("unchecked") - private ActionListener handleWriteResponseAndReloadPipelines(ActionListener listener) { - return new ActionListener() { - @Override - public void onResponse(T result) { - try { - reloadPipelinesAction.reloadPipelinesOnAllNodes(reloadResult -> listener.onResponse(result)); - } catch (Throwable e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - listener.onFailure(e); - } - }; - } - } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java index 96e91737af7..84ddb6aa9e7 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java @@ -17,7 +17,6 @@ * under the License. */ - package org.elasticsearch.ingest.core; import java.util.ArrayList; @@ -78,20 +77,16 @@ public final class Pipeline { } public final static class Factory { - private Processor readProcessor(Map processorRegistry, String type, Map config) throws Exception { - Processor.Factory factory = processorRegistry.get(type); - if (factory != null) { - List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); - Processor processor = factory.create(config); - if (config.isEmpty() == false) { - throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); - } - if (onFailureProcessors.isEmpty()) { - return processor; - } - return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); + + public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants? + List processors = readProcessors("processors", processorRegistry, config); + List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); + if (config.isEmpty() == false) { + throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } - throw new IllegalArgumentException("No processor type exists with name [" + type + "]"); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); + return new Pipeline(id, description, compoundProcessor); } private List readProcessors(String fieldName, Map processorRegistry, Map config) throws Exception { @@ -108,12 +103,22 @@ public final class Pipeline { return onFailureProcessors; } - public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants? - List processors = readProcessors("processors", processorRegistry, config); - List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); - CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); - return new Pipeline(id, description, compoundProcessor); + private Processor readProcessor(Map processorRegistry, String type, Map config) throws Exception { + Processor.Factory factory = processorRegistry.get(type); + if (factory != null) { + List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); + Processor processor = factory.create(config); + if (config.isEmpty() == false) { + throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessors.isEmpty()) { + return processor; + } + return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); + } + throw new IllegalArgumentException("No processor type exists with name [" + type + "]"); } + + } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java index cb70b5a79c8..723e3eb6840 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.support.RestStatusToXContentListener; +import org.elasticsearch.rest.action.support.AcknowledgedRestListener; public class RestDeletePipelineAction extends BaseRestHandler { @@ -41,6 +41,8 @@ public class RestDeletePipelineAction extends BaseRestHandler { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { DeletePipelineRequest request = new DeletePipelineRequest(); request.id(restRequest.param("id")); - client.deletePipeline(request, new RestStatusToXContentListener<>(channel)); + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + client.deletePipeline(request, new AcknowledgedRestListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java index 7fae61eaed5..4860f5e7931 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java @@ -42,6 +42,7 @@ public class RestGetPipelineAction extends BaseRestHandler { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { GetPipelineRequest request = new GetPipelineRequest(); request.ids(Strings.splitStringByCommaToArray(restRequest.param("id"))); + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); client.getPipeline(request, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 98ec67782d5..7c2d9a717dc 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.support.RestStatusToXContentListener; +import org.elasticsearch.rest.action.support.AcknowledgedRestListener; public class RestPutPipelineAction extends BaseRestHandler { @@ -44,6 +44,8 @@ public class RestPutPipelineAction extends BaseRestHandler { if (restRequest.hasContent()) { request.source(restRequest.content()); } - client.putPipeline(request, new RestStatusToXContentListener<>(channel)); + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + client.putPipeline(request, new AcknowledgedRestListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index db7b5df6662..be229310043 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -776,6 +776,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(139, null); ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class); ids.put(141, org.elasticsearch.index.query.QueryShardException.class); + ids.put(142, org.elasticsearch.ingest.PipelineMissingException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/ReloadPipelinesActionTests.java b/core/src/test/java/org/elasticsearch/action/ingest/ReloadPipelinesActionTests.java deleted file mode 100644 index 8a0284d80cf..00000000000 --- a/core/src/test/java/org/elasticsearch/action/ingest/ReloadPipelinesActionTests.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.junit.Before; -import org.mockito.Matchers; - -import java.util.Collections; - -import static org.hamcrest.CoreMatchers.is; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ReloadPipelinesActionTests extends ESTestCase { - - private ClusterService clusterService; - private TransportService transportService; - private ReloadPipelinesAction reloadPipelinesAction; - - @Before - public void init() { - Settings settings = Settings.EMPTY; - PipelineStore pipelineStore = mock(PipelineStore.class); - clusterService = mock(ClusterService.class); - transportService = mock(TransportService.class); - reloadPipelinesAction = new ReloadPipelinesAction(settings, pipelineStore, clusterService, transportService); - } - - public void testSuccess() { - int numNodes = randomIntBetween(1, 10); - ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build(); - when(clusterService.state()).thenReturn(state); - - doAnswer(mock -> { - TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; - for (int i = 0; i < numNodes; i++) { - handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); - } - return mock; - }).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any()); - reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true))); - } - - public void testWithAtLeastOneFailure() { - int numNodes = randomIntBetween(1, 10); - - ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build(); - when(clusterService.state()).thenReturn(state); - - doAnswer(mock -> { - TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; - handler.handleException(new TransportException("test failure")); - for (int i = 1; i < numNodes; i++) { - if (randomBoolean()) { - handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); - } else { - handler.handleException(new TransportException("test failure")); - } - } - return mock; - }).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any()); - reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false))); - } - - private static DiscoveryNodes.Builder generateDiscoNodes(int numNodes) { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - for (int i = 0; i < numNodes; i++) { - String id = Integer.toString(i); - DiscoveryNode discoNode = new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Version.CURRENT); - discoNodes.put(discoNode); - } - return discoNodes; - } -} diff --git a/core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java b/core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java deleted file mode 100644 index 886d9b94e84..00000000000 --- a/core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.test.ESSingleNodeTestCase; - -import static org.hamcrest.Matchers.equalTo; - -// Not a real unit tests with mocks, but with a single node, because we mock the scroll -// search behaviour and it changes then this test will not catch this. -public class SearchScrollIteratorTests extends ESSingleNodeTestCase { - - public void testSearchScrollIterator() { - createIndex("index"); - int numDocs = scaledRandomIntBetween(0, 128); - for (int i = 0; i < numDocs; i++) { - client().prepareIndex("index", "type", Integer.toString(i)) - .setSource("field", "value" + i) - .get(); - } - client().admin().indices().prepareRefresh().get(); - - int i = 0; - SearchRequest searchRequest = new SearchRequest("index"); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - // randomize size, because that also controls how many actual searches will happen: - sourceBuilder.size(scaledRandomIntBetween(1, 10)); - searchRequest.source(sourceBuilder); - Iterable hits = SearchScrollIterator.createIterator(client(), TimeValue.timeValueSeconds(10), searchRequest); - for (SearchHit hit : hits) { - assertThat(hit.getId(), equalTo(Integer.toString(i))); - assertThat(hit.getSource().get("field"), equalTo("value" + i)); - i++; - } - assertThat(i, equalTo(numDocs)); - } - -} diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestBootstrapperTests.java b/core/src/test/java/org/elasticsearch/ingest/IngestBootstrapperTests.java deleted file mode 100644 index a352c3af723..00000000000 --- a/core/src/test/java/org/elasticsearch/ingest/IngestBootstrapperTests.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.Version; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.text.Text; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.internal.InternalSearchHit; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.core.Is.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class IngestBootstrapperTests extends ESTestCase { - - private PipelineStore store; - private IngestBootstrapper bootstrapper; - - @Before - public void init() { - ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.executor(any())).thenReturn(Runnable::run); - ClusterService clusterService = mock(ClusterService.class); - store = mock(PipelineStore.class); - when(store.isStarted()).thenReturn(false); - PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class); - bootstrapper = new IngestBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService); - } - - public void testStartAndStopInBackground() throws Exception { - ThreadPool threadPool = new ThreadPool("test"); - Client client = mock(Client.class); - TransportService transportService = mock(TransportService.class); - - ClusterService clusterService = mock(ClusterService.class); - when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList())); - when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList())); - Settings settings = Settings.EMPTY; - PipelineStore store = new PipelineStore(settings, clusterService, transportService); - IngestBootstrapper bootstrapper = new IngestBootstrapper( - settings, threadPool, clusterService, store, null - ); - bootstrapper.setClient(client); - - List hits = new ArrayList<>(); - hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) - ); - when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits)); - when(client.get(any())).thenReturn(PipelineStoreTests.expectedGetResponse(true)); - - try { - store.get("1"); - fail("IllegalStateException expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("pipeline store isn't ready yet")); - } - - MetaData metadata = MetaData.builder() - .put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)) - .build(); - bootstrapper.startPipelineStore(metadata); - assertBusy(() -> { - assertThat(store.isStarted(), is(true)); - assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - }); - - bootstrapper.stopPipelineStore("testing stop"); - assertBusy(() -> assertThat(store.isStarted(), is(false))); - - // the map internal search hit holds gets emptied after use, which is ok, but in this test we need to reset the source: - hits.get(0).sourceRef(new BytesArray("{\"description\": \"_description1\"}")); - hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) - ); - bootstrapper.startPipelineStore(metadata); - assertBusy(() -> { - assertThat(store.isStarted(), is(true)); - assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - assertThat(store.get("2"), notNullValue()); - assertThat(store.get("2").getId(), equalTo("2")); - assertThat(store.get("2").getDescription(), equalTo("_description2")); - }); - threadPool.shutdown(); - } - - public void testPipelineStoreBootstrappingGlobalStateNotRecoveredBlock() throws Exception { - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - csBuilder.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)); - ClusterState cs = csBuilder.metaData(MetaData.builder()).build(); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, never()).start(); - verify(store, never()).stop(anyString()); - } - - public void testPipelineStoreBootstrappingGlobalStateNoMasterBlock() throws Exception { - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - csBuilder.blocks(ClusterBlocks.builder() - .addGlobalBlock(randomBoolean() ? DiscoverySettings.NO_MASTER_BLOCK_WRITES : DiscoverySettings.NO_MASTER_BLOCK_ALL)); - ClusterState cs = csBuilder.metaData( - MetaData.builder() - .put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)) - ).build(); - - // We're not started and there is a no master block, doing nothing: - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, never()).start(); - verify(store, never()).stop(anyString()); - - // We're started and there is a no master block, so we stop the store: - when(store.isStarted()).thenReturn(true); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, never()).start(); - verify(store, times(1)).stop(anyString()); - } - - public void testPipelineStoreBootstrappingNoIngestIndex() throws Exception { - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - ClusterState cs = csBuilder.metaData(MetaData.builder() - .put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME))) - .build(); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, times(1)).start(); - } - - public void testPipelineStoreBootstrappingIngestIndexShardsNotStarted() throws Exception { - // .ingest index, but not all primary shards started: - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) - .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.INITIALIZING, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); - ClusterState cs = csBuilder.build(); - - // We're not running and the cluster state isn't ready, so we don't start. - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, never()).start(); - verify(store, never()).stop(anyString()); - - // We're running and the cluster state indicates that all our shards are unassigned, so we stop. - when(store.isStarted()).thenReturn(true); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, never()).start(); - verify(store, times(1)).stop(anyString()); - } - - public void testPipelineStoreBootstrappingIngestIndexShardsStarted() throws Exception { - // .ingest index, but not all primary shards started: - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) - .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); - ClusterState cs = csBuilder.build(); - - // We're not running and the cluster state is ready, so we start. - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, times(1)).start(); - verify(store, never()).stop(anyString()); - - // We're running and the cluster state is good, so we do nothing. - when(store.isStarted()).thenReturn(true); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, times(1)).start(); - verify(store, never()).stop(anyString()); - } - - public void testPipelineStoreBootstrappingFailure() throws Exception { - // .ingest index, but not all primary shards started: - ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) - .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); - ClusterState cs = csBuilder.build(); - - // fail the first call with an runtime exception and subsequent calls just return: - doThrow(new RuntimeException()).doNothing().when(store).start(); - bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); - verify(store, times(2)).start(); - verify(store, never()).stop(anyString()); - } - -} diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 1c50bc38b6a..9742dd1b978 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -22,7 +22,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -32,6 +31,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; +import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.core.IngestDocument; @@ -164,7 +164,6 @@ public class IngestClientIT extends ESIntegTestCase { } public void test() throws Exception { - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(); putPipelineRequest.id("_id"); putPipelineRequest.source(jsonBuilder().startObject() @@ -200,9 +199,8 @@ public class IngestClientIT extends ESIntegTestCase { DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest(); deletePipelineRequest.id("_id"); - DeleteResponse response = client().deletePipeline(deletePipelineRequest).get(); - assertThat(response.isFound(), is(true)); - assertThat(response.getId(), equalTo("_id")); + WritePipelineResponse response = client().deletePipeline(deletePipelineRequest).get(); + assertThat(response.isAcknowledged(), is(true)); getResponse = client().prepareGetPipeline().setIds("_id").get(); assertThat(getResponse.isFound(), is(false)); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 57086cc02a9..19d56858c8f 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -19,444 +19,175 @@ package org.elasticsearch.ingest; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.search.internal.InternalSearchHit; -import org.elasticsearch.search.internal.InternalSearchHits; -import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.ingest.core.Pipeline; +import org.elasticsearch.ingest.processor.SetProcessor; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.TransportService; import org.junit.Before; -import org.mockito.ArgumentMatcher; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class PipelineStoreTests extends ESTestCase { private PipelineStore store; - private Client client; @Before public void init() throws Exception { - Settings settings = Settings.EMPTY; ClusterService clusterService = mock(ClusterService.class); - TransportService transportService = mock(TransportService.class); - - client = mock(Client.class); - when(client.search(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); - when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); - store = new PipelineStore(settings, clusterService, transportService); - store.setClient(client); - store.start(); + store = new PipelineStore(Settings.EMPTY, clusterService); + ProcessorsRegistry registry = new ProcessorsRegistry(); + registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); + store.buildProcessorFactoryRegistry(registry, null, null); } - public void testUpdatePipeline() throws Exception { - List hits = new ArrayList<>(); - hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) + public void testUpdatePipelines() { + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + store.innerUpdatePipelines(clusterState); + assertThat(store.pipelines.size(), is(0)); + + PipelineConfiguration pipeline = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}") ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline)); + clusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) + .build(); + store.innerUpdatePipelines(clusterState); + assertThat(store.pipelines.size(), is(1)); + assertThat(store.pipelines.get("_id").getId(), equalTo("_id")); + assertThat(store.pipelines.get("_id").getDescription(), nullValue()); + assertThat(store.pipelines.get("_id").getProcessors().size(), equalTo(1)); + assertThat(store.pipelines.get("_id").getProcessors().get(0).getType(), equalTo("set")); + } - when(client.search(any())).thenReturn(expectedSearchReponse(hits)); - when(client.get(any())).thenReturn(expectedGetResponse(true)); - assertThat(store.get("1"), nullValue()); + public void testPut() { + String id = "_id"; + Pipeline pipeline = store.get(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - store.updatePipelines(); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); + // add a new pipeline: + PutPipelineRequest putRequest = new PutPipelineRequest(); + putRequest.id(id); + putRequest.source(new BytesArray("{\"processors\": []}")); + clusterState = store.innerPut(putRequest, clusterState); + store.innerUpdatePipelines(clusterState); + pipeline = store.get(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(0)); - when(client.get(any())).thenReturn(expectedGetResponse(true)); - hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) + // overwrite existing pipeline: + putRequest = new PutPipelineRequest(); + putRequest.id(id); + putRequest.source(new BytesArray("{\"processors\": [], \"description\": \"_description\"}")); + clusterState = store.innerPut(putRequest, clusterState); + store.innerUpdatePipelines(clusterState); + pipeline = store.get(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + } + + public void testDelete() { + PipelineConfiguration config = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}") ); - store.updatePipelines(); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - assertThat(store.get("2").getId(), equalTo("2")); - assertThat(store.get("2").getDescription(), equalTo("_description2")); - - hits.remove(1); - when(client.get(eqGetRequest(PipelineStore.INDEX, PipelineStore.TYPE, "2"))).thenReturn(expectedGetResponse(false)); - store.updatePipelines(); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - assertThat(store.get("2"), nullValue()); - } - - public void testGetReference() throws Exception { - // fill the store up for the test: - List hits = new ArrayList<>(); - hits.add(new InternalSearchHit(0, "foo", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); - hits.add(new InternalSearchHit(0, "bar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); - hits.add(new InternalSearchHit(0, "foobar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); - when(client.search(any())).thenReturn(expectedSearchReponse(hits)); - store.updatePipelines(); - - List result = store.getReference("foo"); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); - - result = store.getReference("foo*"); - // to make sure the order is consistent in the test: - result.sort((first, second) -> { - return first.getPipeline().getId().compareTo(second.getPipeline().getId()); - }); - assertThat(result.size(), equalTo(2)); - assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); - assertThat(result.get(1).getPipeline().getId(), equalTo("foobar")); - - result = store.getReference("bar*"); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0).getPipeline().getId(), equalTo("bar")); - - result = store.getReference("*"); - // to make sure the order is consistent in the test: - result.sort((first, second) -> { - return first.getPipeline().getId().compareTo(second.getPipeline().getId()); - }); - assertThat(result.size(), equalTo(3)); - assertThat(result.get(0).getPipeline().getId(), equalTo("bar")); - assertThat(result.get(1).getPipeline().getId(), equalTo("foo")); - assertThat(result.get(2).getPipeline().getId(), equalTo("foobar")); - - result = store.getReference("foo", "bar"); - assertThat(result.size(), equalTo(2)); - assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); - assertThat(result.get(1).getPipeline().getId(), equalTo("bar")); - } - - public void testValidateIngestIndex() throws Exception { - // ingest index doesn't exist: - ClusterState state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder()) + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) .build(); - assertThat(store.isIngestIndexPresent(state), equalTo(false)); + store.innerUpdatePipelines(clusterState); + assertThat(store.get("_id"), notNullValue()); - // ingest index does exist and is valid: - IndexMetaData.Builder indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - assertThat(store.isIngestIndexPresent(state), equalTo(true)); + // Delete pipeline: + DeletePipelineRequest deleteRequest = new DeletePipelineRequest(); + deleteRequest.id("_id"); + clusterState = store.innerDelete(deleteRequest, clusterState); + store.innerUpdatePipelines(clusterState); + assertThat(store.get("_id"), nullValue()); - // fails, has dynamic mapping - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.mapper.dynamic", true) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); + // Delete existing pipeline: try { - store.isIngestIndexPresent(state); + store.innerDelete(deleteRequest, clusterState); fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.mapper.dynamic] setting is [true] while [false] is expected")); - } - - // fails, incorrect number of primary shards - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.number_of_shards", 2) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.number_of_shards] setting is [2] while [1] is expected")); - } - - // fails, incorrect number of replica shards - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.number_of_replicas", 2) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.number_of_replicas] setting is [2] while [1] is expected")); - } - - // fails not a strict mapping: - String mapping = XContentFactory.jsonBuilder().startObject() - .startObject("_all") - .field("enabled", false) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "object") - .field("enabled", false) - .field("dynamic", true) - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", true) - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, mapping); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest mapping, pipeline mapping must be strict")); - } - - // fails _all field is enabled: - mapping = XContentFactory.jsonBuilder().startObject() - .field("dynamic", "strict") - .startObject("_all") - .field("enabled", true) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, mapping); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest mapping, _all field is enabled")); - } - - // fails processor field not of type object: - mapping = XContentFactory.jsonBuilder().startObject() - .field("dynamic", "strict") - .startObject("_all") - .field("enabled", false) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "nested") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, mapping); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field's type is [nested] while [object] is expected")); - } - - // fails processor field enabled option is true: - mapping = XContentFactory.jsonBuilder().startObject() - .field("dynamic", "strict") - .startObject("_all") - .field("enabled", false) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "object") - .field("enabled", true) - .field("dynamic", "true") - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, mapping); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field enabled option is [true] while [false] is expected")); - } - - // fails processor field dynamic option is false: - mapping = XContentFactory.jsonBuilder().startObject() - .field("dynamic", "strict") - .startObject("_all") - .field("enabled", false) - .endObject() - .startObject("properties") - .startObject("processors") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "false") - .endObject() - .startObject("on_failure") - .field("type", "object") - .field("enabled", false) - .field("dynamic", "true") - .endObject() - .startObject("description") - .field("type", "string") - .endObject() - .endObject() - .endObject().string(); - indexMetaData = IndexMetaData.builder(PipelineStore.INDEX) - .settings(Settings.builder() - .put(PipelineStore.INGEST_INDEX_SETTING) - .put("index.version.created", Version.CURRENT) - ) - .putMapping(PipelineStore.TYPE, mapping); - state = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().put(indexMetaData)) - .build(); - try { - store.isIngestIndexPresent(state); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field dynamic option is [false] while [true] is expected")); + } catch (PipelineMissingException e) { + assertThat(e.getMessage(), equalTo("pipeline [_id] is missing")); } } - static ActionFuture expectedSearchReponse(List hits) { - return new PlainActionFuture() { + public void testGetPipelines() { + Map configs = new HashMap<>(); + configs.put("_id1", new PipelineConfiguration( + "_id1", new BytesArray("{\"processors\": []}") + )); + configs.put("_id2", new PipelineConfiguration( + "_id2", new BytesArray("{\"processors\": []}") + )); - @Override - public SearchResponse get(long timeout, TimeUnit unit) { - InternalSearchHits hits1 = new InternalSearchHits(hits.toArray(new InternalSearchHit[0]), hits.size(), 1f); - return new SearchResponse(new InternalSearchResponse(hits1, null, null, null, false, null), "_scrollId", 1, 1, 1, null); - } - }; + assertThat(store.innerGetPipelines(null, "_id1").isEmpty(), is(true)); + + IngestMetadata ingestMetadata = new IngestMetadata(configs); + List pipelines = store.innerGetPipelines(ingestMetadata, "_id1"); + assertThat(pipelines.size(), equalTo(1)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + + pipelines = store.innerGetPipelines(ingestMetadata, "_id1", "_id2"); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + pipelines = store.innerGetPipelines(ingestMetadata, "_id*"); + pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); } - static ActionFuture expectedGetResponse(boolean exists) { - return new PlainActionFuture() { - @Override - public GetResponse get() throws InterruptedException, ExecutionException { - return new GetResponse(new GetResult("_index", "_type", "_id", 1, exists, null, null)); - } - }; - } + public void testCrud() throws Exception { + String id = "_id"; + Pipeline pipeline = store.get(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - static GetRequest eqGetRequest(String index, String type, String id) { - return argThat(new GetRequestMatcher(index, type, id)); - } + PutPipelineRequest putRequest = new PutPipelineRequest(); + putRequest.id(id); + putRequest.source(new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); + clusterState = store.innerPut(putRequest, clusterState); + store.innerUpdatePipelines(clusterState); + pipeline = store.get(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); - static class GetRequestMatcher extends ArgumentMatcher { - - private final String index; - private final String type; - private final String id; - - public GetRequestMatcher(String index, String type, String id) { - this.index = index; - this.type = type; - this.id = id; - } - - @Override - public boolean matches(Object o) { - GetRequest getRequest = (GetRequest) o; - return Objects.equals(getRequest.index(), index) && - Objects.equals(getRequest.type(), type) && - Objects.equals(getRequest.id(), id); - } + DeletePipelineRequest deleteRequest = new DeletePipelineRequest(); + deleteRequest.id(id); + clusterState = store.innerDelete(deleteRequest, clusterState); + store.innerUpdatePipelines(clusterState); + pipeline = store.get(id); + assertThat(pipeline, nullValue()); } } diff --git a/modules/ingest-grok/src/test/resources/rest-api-spec/test/ingest_grok/20_grok.yaml b/modules/ingest-grok/src/test/resources/rest-api-spec/test/ingest_grok/20_grok.yaml index 70b1c8a9d40..f88136d8a79 100644 --- a/modules/ingest-grok/src/test/resources/rest-api-spec/test/ingest_grok/20_grok.yaml +++ b/modules/ingest-grok/src/test/resources/rest-api-spec/test/ingest_grok/20_grok.yaml @@ -15,7 +15,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -54,7 +54,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -91,7 +91,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/plugins/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yaml b/plugins/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yaml index 91e0c7ac844..e8da23e0edd 100644 --- a/plugins/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yaml +++ b/plugins/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yaml @@ -14,7 +14,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -58,7 +58,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -103,7 +103,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml index 24ac604989f..01d674053ae 100644 --- a/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml +++ b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml @@ -15,25 +15,18 @@ } ] } - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 1 } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: ingest.get_pipeline: id: "my_pipeline" - - match: { my_pipeline._source.description: "_description" } - - match: { my_pipeline._version: 1 } + - match: { pipelines.0.id: "my_pipeline" } + - match: { pipelines.0.config.description: "_description" } - do: ingest.delete_pipeline: id: "my_pipeline" - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 2 } - - match: { _id: "my_pipeline" } - - match: { found: true } + - match: { acknowledged: true } --- "Test ingest simulate API works fine when node.ingest is set to false": @@ -52,7 +45,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: ingest.simulate: diff --git a/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml index e65e68fdd45..9e644773c6a 100644 --- a/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml +++ b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml @@ -25,7 +25,7 @@ } ] } - - match: { _id: "my_pipeline_1" } + - match: { acknowledged: true } - do: index: @@ -72,7 +72,7 @@ ] } - - match: { _id: "my_pipeline_1" } + - match: { acknowledged: true } - do: ingest.put_pipeline: @@ -89,7 +89,7 @@ } ] } - - match: { _id: "my_pipeline_2" } + - match: { acknowledged: true } - do: ingest.put_pipeline: @@ -105,7 +105,7 @@ } ] } - - match: { _id: "my_pipeline_3" } + - match: { acknowledged: true } - do: index: @@ -198,7 +198,7 @@ } ] } - - match: { _id: "my_handled_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.delete_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.delete_pipeline.json index 69b8f53d63a..1c515e45095 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.delete_pipeline.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.delete_pipeline.json @@ -13,6 +13,14 @@ } }, "params": { + "master_timeout": { + "type" : "time", + "description" : "Explicit operation timeout for connection to master node" + }, + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + } } }, "body": null diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.get_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.get_pipeline.json index 71772a28a76..6c50657ae1a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.get_pipeline.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.get_pipeline.json @@ -13,6 +13,10 @@ } }, "params": { + "master_timeout": { + "type" : "time", + "description" : "Explicit operation timeout for connection to master node" + } } }, "body": null diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json index fd88d352731..e4c3c2eb3f9 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json @@ -13,6 +13,14 @@ } }, "params": { + "master_timeout": { + "type" : "time", + "description" : "Explicit operation timeout for connection to master node" + }, + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + } } }, "body": { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml index 5a62247df41..bf0817f2da1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml @@ -15,25 +15,18 @@ } ] } - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 1 } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: ingest.get_pipeline: id: "my_pipeline" - - match: { my_pipeline._source.description: "_description" } - - match: { my_pipeline._version: 1 } + - match: { pipelines.0.id: "my_pipeline" } + - match: { pipelines.0.config.description: "_description" } - do: ingest.delete_pipeline: id: "my_pipeline" - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 2 } - - match: { _id: "my_pipeline" } - - match: { found: true } + - match: { acknowledged: true } - do: catch: missing @@ -82,25 +75,18 @@ } ] } - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 1 } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: ingest.get_pipeline: id: "my_pipeline" - - match: { my_pipeline._source.description: "_description" } - - match: { my_pipeline._version: 1 } + - match: { pipelines.0.id: "my_pipeline" } + - match: { pipelines.0.config.description: "_description" } - do: ingest.delete_pipeline: id: "my_pipeline" - - match: { _index: ".ingest" } - - match: { _type: "pipeline" } - - match: { _version: 2 } - - match: { _id: "my_pipeline" } - - match: { found: true } + - match: { acknowledged: true } - do: catch: missing diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/20_date_processor.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/20_date_processor.yaml index 8852e5e5749..71c5c4069b2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/20_date_processor.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/20_date_processor.yaml @@ -17,7 +17,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/30_mutate.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/30_mutate.yaml index a0a29e9c050..1e7911e519a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/30_mutate.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/30_mutate.yaml @@ -72,7 +72,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -130,7 +130,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml index 9947129788b..a5965adec6e 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml @@ -15,7 +15,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: ingest.simulate: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml index a01b0dacac0..7bce12d2ec5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/50_on_failure.yaml @@ -30,7 +30,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: @@ -88,7 +88,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/60_fail.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/60_fail.yaml index d491a95686e..019c229ae38 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/60_fail.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/60_fail.yaml @@ -14,7 +14,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: catch: request @@ -49,7 +49,7 @@ } ] } - - match: { _id: "my_pipeline" } + - match: { acknowledged: true } - do: index: