Move the pipeline configuration from the dedicated index to the cluster state.
Closes #15842
This commit is contained in:
parent
5e185befa8
commit
f3883343cb
|
@ -610,7 +610,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
RETRY_ON_REPLICA_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException.class, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException::new, 136),
|
||||
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, org.elasticsearch.indices.TypeMissingException::new, 137),
|
||||
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140),
|
||||
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141);
|
||||
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141),
|
||||
PIPELINE_MISSING_EXCEPTION(org.elasticsearch.ingest.PipelineMissingException.class, org.elasticsearch.ingest.PipelineMissingException::new, 142);
|
||||
|
||||
final Class<? extends ElasticsearchException> exceptionClass;
|
||||
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;
|
||||
|
|
|
@ -20,10 +20,9 @@
|
|||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeletePipelineAction extends Action<DeletePipelineRequest, DeleteResponse, DeletePipelineRequestBuilder> {
|
||||
public class DeletePipelineAction extends Action<DeletePipelineRequest, WritePipelineResponse, DeletePipelineRequestBuilder> {
|
||||
|
||||
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();
|
||||
public static final String NAME = "cluster:admin/ingest/pipeline/delete";
|
||||
|
@ -38,7 +37,7 @@ public class DeletePipelineAction extends Action<DeletePipelineRequest, DeleteRe
|
|||
}
|
||||
|
||||
@Override
|
||||
public DeleteResponse newResponse() {
|
||||
return new DeleteResponse();
|
||||
public WritePipelineResponse newResponse() {
|
||||
return new WritePipelineResponse();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -28,7 +29,7 @@ import java.io.IOException;
|
|||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class DeletePipelineRequest extends ActionRequest {
|
||||
public class DeletePipelineRequest extends AcknowledgedRequest {
|
||||
|
||||
private String id;
|
||||
|
||||
|
|
|
@ -20,10 +20,9 @@
|
|||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeletePipelineRequestBuilder extends ActionRequestBuilder<DeletePipelineRequest, DeleteResponse, DeletePipelineRequestBuilder> {
|
||||
public class DeletePipelineRequestBuilder extends ActionRequestBuilder<DeletePipelineRequest, WritePipelineResponse, DeletePipelineRequestBuilder> {
|
||||
|
||||
public DeletePipelineRequestBuilder(ElasticsearchClient client, DeletePipelineAction action) {
|
||||
super(client, action, new DeletePipelineRequest());
|
||||
|
|
|
@ -20,9 +20,12 @@
|
|||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -31,18 +34,36 @@ import org.elasticsearch.ingest.PipelineStore;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class DeletePipelineTransportAction extends HandledTransportAction<DeletePipelineRequest, DeleteResponse> {
|
||||
public class DeletePipelineTransportAction extends TransportMasterNodeAction<DeletePipelineRequest, WritePipelineResponse> {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@Inject
|
||||
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
|
||||
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
|
||||
this.pipelineStore = bootstrapper.getPipelineStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WritePipelineResponse newResponse() {
|
||||
return new WritePipelineResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<WritePipelineResponse> listener) throws Exception {
|
||||
pipelineStore.delete(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(DeletePipelineRequest request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -28,7 +28,7 @@ import java.io.IOException;
|
|||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class GetPipelineRequest extends ActionRequest {
|
||||
public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest> {
|
||||
|
||||
private String[] ids;
|
||||
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class GetPipelineRequestBuilder extends ActionRequestBuilder<GetPipelineRequest, GetPipelineResponse, GetPipelineRequestBuilder> {
|
||||
public class GetPipelineRequestBuilder extends MasterNodeReadOperationRequestBuilder<GetPipelineRequest, GetPipelineResponse, GetPipelineRequestBuilder> {
|
||||
|
||||
public GetPipelineRequestBuilder(ElasticsearchClient client, GetPipelineAction action) {
|
||||
super(client, action, new GetPipelineRequest());
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.ingest.PipelineDefinition;
|
||||
import org.elasticsearch.ingest.PipelineConfiguration;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -33,16 +33,16 @@ import java.util.List;
|
|||
|
||||
public class GetPipelineResponse extends ActionResponse implements StatusToXContent {
|
||||
|
||||
private List<PipelineDefinition> pipelines;
|
||||
private List<PipelineConfiguration> pipelines;
|
||||
|
||||
public GetPipelineResponse() {
|
||||
}
|
||||
|
||||
public GetPipelineResponse(List<PipelineDefinition> pipelines) {
|
||||
public GetPipelineResponse(List<PipelineConfiguration> pipelines) {
|
||||
this.pipelines = pipelines;
|
||||
}
|
||||
|
||||
public List<PipelineDefinition> pipelines() {
|
||||
public List<PipelineConfiguration> pipelines() {
|
||||
return pipelines;
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
|
|||
int size = in.readVInt();
|
||||
pipelines = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
pipelines.add(PipelineDefinition.readPipelineDefinitionFrom(in));
|
||||
pipelines.add(PipelineConfiguration.readPipelineConfiguration(in));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVInt(pipelines.size());
|
||||
for (PipelineDefinition pipeline : pipelines) {
|
||||
for (PipelineConfiguration pipeline : pipelines) {
|
||||
pipeline.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
@ -76,9 +76,11 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
|
|||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
for (PipelineDefinition definition : pipelines) {
|
||||
definition.toXContent(builder, params);
|
||||
builder.startArray("pipelines");
|
||||
for (PipelineConfiguration pipeline : pipelines) {
|
||||
pipeline.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,31 +21,49 @@ package org.elasticsearch.action.ingest;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.IngestBootstrapper;
|
||||
import org.elasticsearch.ingest.PipelineDefinition;
|
||||
import org.elasticsearch.ingest.PipelineStore;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class GetPipelineTransportAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> {
|
||||
public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@Inject
|
||||
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
|
||||
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
|
||||
this.pipelineStore = bootstrapper.getPipelineStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener) {
|
||||
List<PipelineDefinition> references = pipelineStore.getReference(request.ids());
|
||||
listener.onResponse(new GetPipelineResponse(references));
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetPipelineResponse newResponse() {
|
||||
return new GetPipelineResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) throws Exception {
|
||||
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(request.ids())));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(GetPipelineRequest request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.action.Action;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class PutPipelineAction extends Action<PutPipelineRequest, IndexResponse, PutPipelineRequestBuilder> {
|
||||
public class PutPipelineAction extends Action<PutPipelineRequest, WritePipelineResponse, PutPipelineRequestBuilder> {
|
||||
|
||||
public static final PutPipelineAction INSTANCE = new PutPipelineAction();
|
||||
public static final String NAME = "cluster:admin/ingest/pipeline/put";
|
||||
|
@ -38,7 +38,7 @@ public class PutPipelineAction extends Action<PutPipelineRequest, IndexResponse,
|
|||
}
|
||||
|
||||
@Override
|
||||
public IndexResponse newResponse() {
|
||||
return new IndexResponse();
|
||||
public WritePipelineResponse newResponse() {
|
||||
return new WritePipelineResponse();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.action.ingest;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.cluster.ack.AckedRequest;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -29,7 +31,7 @@ import java.io.IOException;
|
|||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class PutPipelineRequest extends ActionRequest {
|
||||
public class PutPipelineRequest extends AcknowledgedRequest {
|
||||
|
||||
private String id;
|
||||
private BytesReference source;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineRequest, IndexResponse, PutPipelineRequestBuilder> {
|
||||
public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineRequest, WritePipelineResponse, PutPipelineRequestBuilder> {
|
||||
|
||||
public PutPipelineRequestBuilder(ElasticsearchClient client, PutPipelineAction action) {
|
||||
super(client, action, new PutPipelineRequest());
|
||||
|
|
|
@ -23,6 +23,11 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -31,18 +36,36 @@ import org.elasticsearch.ingest.PipelineStore;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, IndexResponse> {
|
||||
public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPipelineRequest, WritePipelineResponse> {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@Inject
|
||||
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
|
||||
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) {
|
||||
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
|
||||
this.pipelineStore = bootstrapper.getPipelineStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WritePipelineResponse newResponse() {
|
||||
return new WritePipelineResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(PutPipelineRequest request, ClusterState state, ActionListener<WritePipelineResponse> listener) throws Exception {
|
||||
pipelineStore.put(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(PutPipelineRequest request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.PipelineStore;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* An internal api that refreshes the in-memory representation of all the pipelines on all ingest nodes.
|
||||
*/
|
||||
public class ReloadPipelinesAction extends AbstractComponent implements TransportRequestHandler<ReloadPipelinesAction.ReloadPipelinesRequest> {
|
||||
|
||||
public static final String ACTION_NAME = "internal:admin/ingest/reload/pipelines";
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final TransportService transportService;
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
public ReloadPipelinesAction(Settings settings, PipelineStore pipelineStore, ClusterService clusterService, TransportService transportService) {
|
||||
super(settings);
|
||||
this.pipelineStore = pipelineStore;
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.MANAGEMENT, this);
|
||||
}
|
||||
|
||||
public void reloadPipelinesOnAllNodes(Consumer<Boolean> listener) {
|
||||
AtomicBoolean failed = new AtomicBoolean();
|
||||
DiscoveryNodes nodes = clusterService.state().getNodes();
|
||||
AtomicInteger expectedResponses = new AtomicInteger(nodes.size());
|
||||
for (DiscoveryNode node : nodes) {
|
||||
ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest();
|
||||
transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler<ReloadPipelinesResponse>() {
|
||||
@Override
|
||||
public ReloadPipelinesResponse newInstance() {
|
||||
return new ReloadPipelinesResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ReloadPipelinesResponse response) {
|
||||
decrementAndReturn();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to update pipelines on remote node [{}]", exp, node);
|
||||
failed.set(true);
|
||||
decrementAndReturn();
|
||||
}
|
||||
|
||||
void decrementAndReturn() {
|
||||
if (expectedResponses.decrementAndGet() == 0) {
|
||||
listener.accept(!failed.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ReloadPipelinesRequest request, TransportChannel channel) throws Exception {
|
||||
try {
|
||||
pipelineStore.updatePipelines();
|
||||
channel.sendResponse(new ReloadPipelinesResponse());
|
||||
} catch (Throwable e) {
|
||||
logger.warn("failed to update pipelines", e);
|
||||
channel.sendResponse(e);
|
||||
}
|
||||
}
|
||||
|
||||
final static class ReloadPipelinesRequest extends TransportRequest {
|
||||
|
||||
}
|
||||
|
||||
final static class ReloadPipelinesResponse extends TransportResponse {
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class WritePipelineResponse extends AcknowledgedResponse {
|
||||
|
||||
WritePipelineResponse() {
|
||||
}
|
||||
|
||||
public WritePipelineResponse(boolean acknowledge) {
|
||||
super(acknowledge);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
readAcknowledged(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
writeAcknowledged(out);
|
||||
}
|
||||
}
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
|
|||
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
|
||||
import org.elasticsearch.action.ingest.WritePipelineResponse;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateRequest;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateResponse;
|
||||
|
@ -605,12 +606,12 @@ public interface Client extends ElasticsearchClient, Releasable {
|
|||
/**
|
||||
* Stores an ingest pipeline
|
||||
*/
|
||||
void putPipeline(PutPipelineRequest request, ActionListener<IndexResponse> listener);
|
||||
void putPipeline(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener);
|
||||
|
||||
/**
|
||||
* Stores an ingest pipeline
|
||||
*/
|
||||
ActionFuture<IndexResponse> putPipeline(PutPipelineRequest request);
|
||||
ActionFuture<WritePipelineResponse> putPipeline(PutPipelineRequest request);
|
||||
|
||||
/**
|
||||
* Stores an ingest pipeline
|
||||
|
@ -620,12 +621,12 @@ public interface Client extends ElasticsearchClient, Releasable {
|
|||
/**
|
||||
* Deletes a stored ingest pipeline
|
||||
*/
|
||||
void deletePipeline(DeletePipelineRequest request, ActionListener<DeleteResponse> listener);
|
||||
void deletePipeline(DeletePipelineRequest request, ActionListener<WritePipelineResponse> listener);
|
||||
|
||||
/**
|
||||
* Deletes a stored ingest pipeline
|
||||
*/
|
||||
ActionFuture<DeleteResponse> deletePipeline(DeletePipelineRequest request);
|
||||
ActionFuture<WritePipelineResponse> deletePipeline(DeletePipelineRequest request);
|
||||
|
||||
/**
|
||||
* Deletes a stored ingest pipeline
|
||||
|
|
|
@ -286,6 +286,7 @@ import org.elasticsearch.action.ingest.SimulatePipelineAction;
|
|||
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
|
||||
import org.elasticsearch.action.ingest.WritePipelineResponse;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateAction;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateRequest;
|
||||
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
|
||||
|
@ -806,12 +807,12 @@ public abstract class AbstractClient extends AbstractComponent implements Client
|
|||
}
|
||||
|
||||
@Override
|
||||
public void putPipeline(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
|
||||
public void putPipeline(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) {
|
||||
execute(PutPipelineAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<IndexResponse> putPipeline(PutPipelineRequest request) {
|
||||
public ActionFuture<WritePipelineResponse> putPipeline(PutPipelineRequest request) {
|
||||
return execute(PutPipelineAction.INSTANCE, request);
|
||||
}
|
||||
|
||||
|
@ -821,12 +822,12 @@ public abstract class AbstractClient extends AbstractComponent implements Client
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deletePipeline(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
|
||||
public void deletePipeline(DeletePipelineRequest request, ActionListener<WritePipelineResponse> listener) {
|
||||
execute(DeletePipelineAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<DeleteResponse> deletePipeline(DeletePipelineRequest request) {
|
||||
public ActionFuture<WritePipelineResponse> deletePipeline(DeletePipelineRequest request) {
|
||||
return execute(DeletePipelineAction.INSTANCE, request);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.ingest.IngestMetadata;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -111,6 +112,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
|
|||
static {
|
||||
// register non plugin custom metadata
|
||||
registerPrototype(RepositoriesMetaData.TYPE, RepositoriesMetaData.PROTO);
|
||||
registerPrototype(IngestMetadata.TYPE, IngestMetadata.PROTO);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* An iterator that easily helps to consume all hits from a scroll search.
|
||||
*/
|
||||
public final class SearchScrollIterator implements Iterator<SearchHit> {
|
||||
|
||||
/**
|
||||
* Creates an iterator that returns all matching hits of a scroll search via an iterator.
|
||||
* The iterator will return all hits per scroll search and execute additional scroll searches
|
||||
* to get more hits until all hits have been returned by the scroll search on the ES side.
|
||||
*/
|
||||
public static Iterable<SearchHit> createIterator(Client client, TimeValue scrollTimeout, SearchRequest searchRequest) {
|
||||
searchRequest.scroll(scrollTimeout);
|
||||
SearchResponse searchResponse = client.search(searchRequest).actionGet(scrollTimeout);
|
||||
if (searchResponse.getHits().getTotalHits() == 0) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return () -> new SearchScrollIterator(client, scrollTimeout, searchResponse);
|
||||
}
|
||||
}
|
||||
|
||||
private final Client client;
|
||||
private final TimeValue scrollTimeout;
|
||||
|
||||
private int currentIndex;
|
||||
private SearchHit[] currentHits;
|
||||
private SearchResponse searchResponse;
|
||||
|
||||
private SearchScrollIterator(Client client, TimeValue scrollTimeout, SearchResponse searchResponse) {
|
||||
this.client = client;
|
||||
this.scrollTimeout = scrollTimeout;
|
||||
this.searchResponse = searchResponse;
|
||||
this.currentHits = searchResponse.getHits().getHits();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (currentIndex < currentHits.length) {
|
||||
return true;
|
||||
} else {
|
||||
if (searchResponse == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SearchScrollRequest request = new SearchScrollRequest(searchResponse.getScrollId());
|
||||
request.scroll(scrollTimeout);
|
||||
searchResponse = client.searchScroll(request).actionGet(scrollTimeout);
|
||||
if (searchResponse.getHits().getHits().length == 0) {
|
||||
searchResponse = null;
|
||||
return false;
|
||||
} else {
|
||||
currentHits = searchResponse.getHits().getHits();
|
||||
currentIndex = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchHit next() {
|
||||
return currentHits[currentIndex++];
|
||||
}
|
||||
}
|
|
@ -48,11 +48,8 @@ import java.io.InputStream;
|
|||
* Instantiates and wires all the services that the ingest plugin will be needing.
|
||||
* Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state.
|
||||
*/
|
||||
public class IngestBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener {
|
||||
public class IngestBootstrapper extends AbstractLifecycleComponent {
|
||||
|
||||
static final String INGEST_INDEX_TEMPLATE_NAME = "ingest-template";
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final Environment environment;
|
||||
private final PipelineStore pipelineStore;
|
||||
private final PipelineExecutionService pipelineExecutionService;
|
||||
|
@ -64,31 +61,12 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
|
|||
// pipelines into NodeModule? I'd really like to prevent adding yet another module.
|
||||
@Inject
|
||||
public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
ProcessorsRegistry processorsRegistry) {
|
||||
ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.environment = environment;
|
||||
this.processorsRegistry = processorsRegistry;
|
||||
this.pipelineStore = new PipelineStore(settings, clusterService, transportService);
|
||||
this.pipelineStore = new PipelineStore(settings, clusterService);
|
||||
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
|
||||
|
||||
boolean isNoTribeNode = settings.getByPrefix("tribe.").getAsMap().isEmpty();
|
||||
if (isNoTribeNode) {
|
||||
clusterService.add(this);
|
||||
}
|
||||
}
|
||||
|
||||
// for testing:
|
||||
IngestBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.environment = null;
|
||||
clusterService.add(this);
|
||||
this.pipelineStore = pipelineStore;
|
||||
this.pipelineExecutionService = pipelineExecutionService;
|
||||
this.processorsRegistry = null;
|
||||
}
|
||||
|
||||
public PipelineStore getPipelineStore() {
|
||||
|
@ -99,49 +77,11 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
|
|||
return pipelineExecutionService;
|
||||
}
|
||||
|
||||
@Inject
|
||||
public void setClient(Client client) {
|
||||
pipelineStore.setClient(client);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public void setScriptService(ScriptService scriptService) {
|
||||
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
ClusterState state = event.state();
|
||||
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pipelineStore.isStarted()) {
|
||||
if (validClusterState(state) == false) {
|
||||
stopPipelineStore("cluster state invalid [" + state + "]");
|
||||
}
|
||||
} else {
|
||||
if (validClusterState(state)) {
|
||||
startPipelineStore(state.metaData());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean validClusterState(ClusterState state) {
|
||||
if (state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES) ||
|
||||
state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (state.getMetaData().hasConcreteIndex(PipelineStore.INDEX)) {
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(PipelineStore.INDEX);
|
||||
return routingTable.allPrimaryShardsActive();
|
||||
} else {
|
||||
// it will be ready when auto create index kicks in before the first pipeline doc gets added
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
}
|
||||
|
@ -159,33 +99,4 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
|
|||
}
|
||||
}
|
||||
|
||||
void startPipelineStore(MetaData metaData) {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
try {
|
||||
pipelineStore.start();
|
||||
} catch (Exception e1) {
|
||||
logger.warn("pipeline store failed to start, retrying...", e1);
|
||||
startPipelineStore(metaData);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
logger.debug("async pipeline store start failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
void stopPipelineStore(String reason) {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
try {
|
||||
pipelineStore.stop(reason);
|
||||
} catch (Exception e) {
|
||||
logger.error("pipeline store stop failure", e);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
logger.debug("async pipeline store stop failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Holds the ingest pipelines that are available in the cluster
|
||||
*/
|
||||
public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
|
||||
|
||||
public final static String TYPE = "ingest";
|
||||
public final static IngestMetadata PROTO = new IngestMetadata();
|
||||
|
||||
// We can't use Pipeline class directly in cluster state, because we don't have the processor factories around when
|
||||
// IngestMetadata is registered as custom metadata.
|
||||
private final Map<String, PipelineConfiguration> pipelines;
|
||||
|
||||
private IngestMetadata() {
|
||||
this.pipelines = Collections.emptyMap();
|
||||
}
|
||||
|
||||
public IngestMetadata(Map<String, PipelineConfiguration> pipelines) {
|
||||
this.pipelines = Collections.unmodifiableMap(pipelines);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
public Map<String, PipelineConfiguration> getPipelines() {
|
||||
return pipelines;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData.Custom readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
Map<String, PipelineConfiguration> pipelines = new HashMap<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
PipelineConfiguration pipeline = PipelineConfiguration.readPipelineConfiguration(in);
|
||||
pipelines.put(pipeline.getId(), pipeline);
|
||||
}
|
||||
return new IngestMetadata(pipelines);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(pipelines.size());
|
||||
for (PipelineConfiguration pipeline : pipelines.values()) {
|
||||
pipeline.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData.Custom fromXContent(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
switch (token) {
|
||||
case FIELD_NAME:
|
||||
currentFieldName = parser.currentName();
|
||||
break;
|
||||
case START_ARRAY:
|
||||
if ("pipelines".equals(currentFieldName)) {
|
||||
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
if (token == XContentParser.Token.START_OBJECT) {
|
||||
PipelineConfiguration pipeline = new PipelineConfiguration(parser);
|
||||
pipelines.put(pipeline.getId(), pipeline);
|
||||
}
|
||||
}
|
||||
return new IngestMetadata(pipelines);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return PROTO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray("pipelines");
|
||||
for (PipelineConfiguration pipeline : pipelines.values()) {
|
||||
pipeline.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return MetaData.API_AND_GATEWAY;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Encapsulates a pipeline's id and configuration as a blob
|
||||
*/
|
||||
public final class PipelineConfiguration implements Writeable<PipelineConfiguration>, ToXContent {
|
||||
|
||||
private final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null);
|
||||
|
||||
public static PipelineConfiguration readPipelineConfiguration(StreamInput in) throws IOException {
|
||||
return PROTOTYPE.readFrom(in);
|
||||
}
|
||||
|
||||
private final String id;
|
||||
// Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state
|
||||
// and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options)
|
||||
// also the get pipeline api just directly returns this to the caller
|
||||
private final BytesReference config;
|
||||
|
||||
PipelineConfiguration(XContentParser parser) throws IOException {
|
||||
String id = null;
|
||||
BytesReference config = null;
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
switch (token) {
|
||||
case FIELD_NAME:
|
||||
currentFieldName = parser.currentName();
|
||||
break;
|
||||
case VALUE_STRING:
|
||||
if ("id".equals(currentFieldName)) {
|
||||
id = parser.text();
|
||||
}
|
||||
break;
|
||||
case START_OBJECT:
|
||||
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
|
||||
XContentHelper.copyCurrentEvent(builder.generator(), parser);
|
||||
config = builder.bytes();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.id = Objects.requireNonNull(id);
|
||||
this.config = Objects.requireNonNull(config);
|
||||
}
|
||||
|
||||
public PipelineConfiguration(String id, BytesReference config) {
|
||||
this.id = id;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Map<String, Object> getConfigAsMap() {
|
||||
return XContentHelper.convertToMap(config, true).v2();
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("id", id);
|
||||
builder.field("config", getConfigAsMap());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PipelineConfiguration readFrom(StreamInput in) throws IOException {
|
||||
return new PipelineConfiguration(in.readString(), in.readBytesReference());
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(id);
|
||||
out.writeBytesReference(config);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.ingest.core.Pipeline;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class PipelineDefinition implements Writeable<PipelineDefinition>, ToXContent {
|
||||
|
||||
private static final PipelineDefinition PROTOTYPE = new PipelineDefinition((String) null, -1, null);
|
||||
|
||||
public static PipelineDefinition readPipelineDefinitionFrom(StreamInput in) throws IOException {
|
||||
return PROTOTYPE.readFrom(in);
|
||||
}
|
||||
|
||||
private final String id;
|
||||
private final long version;
|
||||
private final BytesReference source;
|
||||
|
||||
private final Pipeline pipeline;
|
||||
|
||||
PipelineDefinition(Pipeline pipeline, long version, BytesReference source) {
|
||||
this.id = pipeline.getId();
|
||||
this.version = version;
|
||||
this.source = source;
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
PipelineDefinition(String id, long version, BytesReference source) {
|
||||
this.id = id;
|
||||
this.version = version;
|
||||
this.source = source;
|
||||
this.pipeline = null;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public BytesReference getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
Pipeline getPipeline() {
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
PipelineDefinition holder = (PipelineDefinition) o;
|
||||
return source.equals(holder.source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return source.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PipelineDefinition readFrom(StreamInput in) throws IOException {
|
||||
String id = in.readString();
|
||||
long version = in.readLong();
|
||||
BytesReference source = in.readBytesReference();
|
||||
return new PipelineDefinition(id, version, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
out.writeBytesReference(source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(id);
|
||||
XContentHelper.writeRawField("_source", source, builder, params);
|
||||
builder.field("_version", version);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class PipelineMissingException extends ElasticsearchException {
|
||||
|
||||
public PipelineMissingException(String id) {
|
||||
super("pipeline [{}] is missing", id);
|
||||
}
|
||||
|
||||
public PipelineMissingException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return RestStatus.NOT_FOUND;
|
||||
}
|
||||
}
|
|
@ -21,43 +21,24 @@ package org.elasticsearch.ingest;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.action.ingest.WritePipelineResponse;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.SearchScrollIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.ingest.ReloadPipelinesAction;
|
||||
import org.elasticsearch.ingest.core.Pipeline;
|
||||
import org.elasticsearch.ingest.core.Processor;
|
||||
import org.elasticsearch.ingest.core.TemplateService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -68,66 +49,22 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class PipelineStore extends AbstractComponent implements Closeable {
|
||||
public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
|
||||
|
||||
public final static String INDEX = ".ingest";
|
||||
public final static String TYPE = "pipeline";
|
||||
|
||||
final static Settings INGEST_INDEX_SETTING = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put("index.mapper.dynamic", false)
|
||||
.build();
|
||||
|
||||
final static String PIPELINE_MAPPING;
|
||||
|
||||
static {
|
||||
try {
|
||||
PIPELINE_MAPPING = XContentFactory.jsonBuilder().startObject()
|
||||
.field("dynamic", "strict")
|
||||
.startObject("_all")
|
||||
.field("enabled", false)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Client client;
|
||||
private final TimeValue scrollTimeout;
|
||||
private final ClusterService clusterService;
|
||||
private final ReloadPipelinesAction reloadPipelinesAction;
|
||||
private final Pipeline.Factory factory = new Pipeline.Factory();
|
||||
private Map<String, Processor.Factory> processorFactoryRegistry;
|
||||
|
||||
private volatile boolean started = false;
|
||||
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
|
||||
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
|
||||
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
|
||||
// processor factories rely on other node services. Custom metadata is statically registered when classes
|
||||
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
|
||||
volatile Map<String, Pipeline> pipelines = new HashMap<>();
|
||||
|
||||
public PipelineStore(Settings settings, ClusterService clusterService, TransportService transportService) {
|
||||
public PipelineStore(Settings settings, ClusterService clusterService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
|
||||
this.reloadPipelinesAction = new ReloadPipelinesAction(settings, this, clusterService, transportService);
|
||||
}
|
||||
|
||||
public void setClient(Client client) {
|
||||
this.client = client;
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) {
|
||||
|
@ -142,7 +79,6 @@ public class PipelineStore extends AbstractComponent implements Closeable {
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
stop("closing");
|
||||
// TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code,
|
||||
// since any wired closable should be able to close itself
|
||||
List<Closeable> closeables = new ArrayList<>();
|
||||
|
@ -154,18 +90,63 @@ public class PipelineStore extends AbstractComponent implements Closeable {
|
|||
IOUtils.close(closeables);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
innerUpdatePipelines(event.state());
|
||||
}
|
||||
|
||||
void innerUpdatePipelines(ClusterState state) {
|
||||
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
|
||||
if (ingestMetadata == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Pipeline> pipelines = new HashMap<>();
|
||||
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
|
||||
try {
|
||||
pipelines.put(pipeline.getId(), constructPipeline(pipeline.getId(), pipeline.getConfigAsMap()));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
this.pipelines = Collections.unmodifiableMap(pipelines);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the pipeline specified by id in the request.
|
||||
*/
|
||||
public void delete(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
|
||||
ensureReady();
|
||||
public void delete(DeletePipelineRequest request, ActionListener<WritePipelineResponse> listener) {
|
||||
clusterService.submitStateUpdateTask("delete-pipeline-" + request.id(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
|
||||
|
||||
DeleteRequest deleteRequest = new DeleteRequest(request);
|
||||
deleteRequest.index(PipelineStore.INDEX);
|
||||
deleteRequest.type(PipelineStore.TYPE);
|
||||
deleteRequest.id(request.id());
|
||||
deleteRequest.refresh(true);
|
||||
client.delete(deleteRequest, handleWriteResponseAndReloadPipelines(listener));
|
||||
@Override
|
||||
protected WritePipelineResponse newResponse(boolean acknowledged) {
|
||||
return new WritePipelineResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerDelete(request, currentState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) {
|
||||
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
|
||||
if (currentIngestMetadata == null) {
|
||||
return currentState;
|
||||
}
|
||||
Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
|
||||
if (pipelines.containsKey(request.id()) == false) {
|
||||
throw new PipelineMissingException(request.id());
|
||||
} else {
|
||||
pipelines = new HashMap<>(pipelines);
|
||||
pipelines.remove(request.id());
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
|
||||
.build());
|
||||
return newState.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -173,280 +154,92 @@ public class PipelineStore extends AbstractComponent implements Closeable {
|
|||
*
|
||||
* @throws IllegalArgumentException If the pipeline holds incorrect configuration
|
||||
*/
|
||||
public void put(PutPipelineRequest request, ActionListener<IndexResponse> listener) throws IllegalArgumentException {
|
||||
ensureReady();
|
||||
|
||||
public void put(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws IllegalArgumentException {
|
||||
try {
|
||||
// validates the pipeline and processor configuration:
|
||||
// validates the pipeline and processor configuration before submitting a cluster update task:
|
||||
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
|
||||
constructPipeline(request.id(), pipelineConfig);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid pipeline configuration", e);
|
||||
}
|
||||
clusterService.submitStateUpdateTask("put-pipeline-" + request.id(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
|
||||
|
||||
ClusterState state = clusterService.state();
|
||||
if (isIngestIndexPresent(state)) {
|
||||
innerPut(request, listener);
|
||||
} else {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
|
||||
createIndexRequest.settings(INGEST_INDEX_SETTING);
|
||||
createIndexRequest.mapping(TYPE, PIPELINE_MAPPING);
|
||||
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse createIndexResponse) {
|
||||
innerPut(request, listener);
|
||||
}
|
||||
@Override
|
||||
protected WritePipelineResponse newResponse(boolean acknowledged) {
|
||||
return new WritePipelineResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerPut(request, currentState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void innerPut(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
|
||||
IndexRequest indexRequest = new IndexRequest(request);
|
||||
indexRequest.index(PipelineStore.INDEX);
|
||||
indexRequest.type(PipelineStore.TYPE);
|
||||
indexRequest.id(request.id());
|
||||
indexRequest.source(request.source());
|
||||
indexRequest.refresh(true);
|
||||
client.index(indexRequest, handleWriteResponseAndReloadPipelines(listener));
|
||||
ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
|
||||
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
|
||||
Map<String, PipelineConfiguration> pipelines;
|
||||
if (currentIngestMetadata != null) {
|
||||
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
|
||||
} else {
|
||||
pipelines = new HashMap<>();
|
||||
}
|
||||
|
||||
pipelines.put(request.id(), new PipelineConfiguration(request.id(), request.source()));
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
|
||||
.build());
|
||||
return newState.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the pipeline by the specified id
|
||||
*/
|
||||
public Pipeline get(String id) {
|
||||
ensureReady();
|
||||
|
||||
PipelineDefinition ref = pipelines.get(id);
|
||||
if (ref != null) {
|
||||
return ref.getPipeline();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
return pipelines.get(id);
|
||||
}
|
||||
|
||||
public Map<String, Processor.Factory> getProcessorFactoryRegistry() {
|
||||
return processorFactoryRegistry;
|
||||
}
|
||||
|
||||
public List<PipelineDefinition> getReference(String... ids) {
|
||||
ensureReady();
|
||||
/**
|
||||
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
|
||||
* may be returned
|
||||
*/
|
||||
// Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't
|
||||
// know how to serialize themselves.
|
||||
public List<PipelineConfiguration> getPipelines(String... ids) {
|
||||
IngestMetadata ingestMetadata = clusterService.state().getMetaData().custom(IngestMetadata.TYPE);
|
||||
return innerGetPipelines(ingestMetadata, ids);
|
||||
}
|
||||
|
||||
List<PipelineDefinition> result = new ArrayList<>(ids.length);
|
||||
List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetadata, String... ids) {
|
||||
if (ingestMetadata == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<PipelineConfiguration> result = new ArrayList<>(ids.length);
|
||||
for (String id : ids) {
|
||||
if (Regex.isSimpleMatchPattern(id)) {
|
||||
for (Map.Entry<String, PipelineDefinition> entry : pipelines.entrySet()) {
|
||||
for (Map.Entry<String, PipelineConfiguration> entry : ingestMetadata.getPipelines().entrySet()) {
|
||||
if (Regex.simpleMatch(id, entry.getKey())) {
|
||||
result.add(entry.getValue());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
PipelineDefinition reference = pipelines.get(id);
|
||||
if (reference != null) {
|
||||
result.add(reference);
|
||||
PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id);
|
||||
if (pipeline != null) {
|
||||
result.add(pipeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public synchronized void updatePipelines() throws Exception {
|
||||
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
|
||||
// so for that reason the goal is to keep the update logic simple.
|
||||
|
||||
int changed = 0;
|
||||
Map<String, PipelineDefinition> newPipelines = new HashMap<>(pipelines);
|
||||
for (SearchHit hit : readAllPipelines()) {
|
||||
String pipelineId = hit.getId();
|
||||
BytesReference pipelineSource = hit.getSourceRef();
|
||||
PipelineDefinition current = newPipelines.get(pipelineId);
|
||||
if (current != null) {
|
||||
// If we first read from a primary shard copy and then from a replica copy,
|
||||
// and a write did not yet make it into the replica shard
|
||||
// then the source is not equal but we don't update because the current pipeline is the latest:
|
||||
if (current.getVersion() > hit.getVersion()) {
|
||||
continue;
|
||||
}
|
||||
if (current.getSource().equals(pipelineSource)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
changed++;
|
||||
Pipeline pipeline = constructPipeline(hit.getId(), hit.sourceAsMap());
|
||||
newPipelines.put(pipelineId, new PipelineDefinition(pipeline, hit.getVersion(), pipelineSource));
|
||||
}
|
||||
|
||||
int removed = 0;
|
||||
for (String existingPipelineId : pipelines.keySet()) {
|
||||
if (pipelineExists(existingPipelineId) == false) {
|
||||
newPipelines.remove(existingPipelineId);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
|
||||
if (changed != 0 || removed != 0) {
|
||||
logger.debug("adding or updating [{}] pipelines and [{}] pipelines removed", changed, removed);
|
||||
pipelines = newPipelines;
|
||||
} else {
|
||||
logger.debug("no pipelines changes detected");
|
||||
}
|
||||
}
|
||||
|
||||
private Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception {
|
||||
return factory.create(id, config, processorFactoryRegistry);
|
||||
}
|
||||
|
||||
boolean pipelineExists(String pipelineId) {
|
||||
GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId);
|
||||
try {
|
||||
GetResponse response = client.get(request).actionGet();
|
||||
return response.isExists();
|
||||
} catch (IndexNotFoundException e) {
|
||||
// the ingest index doesn't exist, so the pipeline doesn't either:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param clusterState The cluster just to check whether the ingest index exists and the state of the ingest index
|
||||
* @throws IllegalStateException If the ingest template exists, but is in an invalid state
|
||||
* @return <code>true</code> when the ingest index exists and has the expected settings and mappings or returns
|
||||
* <code>false</code> when the ingest index doesn't exists and needs to be created.
|
||||
*/
|
||||
boolean isIngestIndexPresent(ClusterState clusterState) throws IllegalStateException {
|
||||
if (clusterState.getMetaData().hasIndex(INDEX)) {
|
||||
IndexMetaData indexMetaData = clusterState.getMetaData().index(INDEX);
|
||||
Settings indexSettings = indexMetaData.getSettings();
|
||||
int numberOfShards = indexSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1);
|
||||
if (numberOfShards != 1) {
|
||||
throw new IllegalStateException("illegal ingest index setting, [" + IndexMetaData.SETTING_NUMBER_OF_SHARDS + "] setting is [" + numberOfShards + "] while [1] is expected");
|
||||
}
|
||||
int numberOfReplicas = indexSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
|
||||
if (numberOfReplicas != 1) {
|
||||
throw new IllegalStateException("illegal ingest index setting, [" + IndexMetaData.SETTING_NUMBER_OF_REPLICAS + "] setting is [" + numberOfReplicas + "] while [1] is expected");
|
||||
}
|
||||
boolean dynamicMappings = indexSettings.getAsBoolean("index.mapper.dynamic", true);
|
||||
if (dynamicMappings != false) {
|
||||
throw new IllegalStateException("illegal ingest index setting, [index.mapper.dynamic] setting is [" + dynamicMappings + "] while [false] is expected");
|
||||
}
|
||||
|
||||
if (indexMetaData.getMappings().size() != 1 && indexMetaData.getMappings().containsKey(TYPE) == false) {
|
||||
throw new IllegalStateException("illegal ingest mappings, only [" + TYPE + "] mapping is allowed to exist in the " + INDEX +" index");
|
||||
}
|
||||
|
||||
try {
|
||||
Map<String, Object> pipelineMapping = indexMetaData.getMappings().get(TYPE).getSourceAsMap();
|
||||
String dynamicMapping = (String) XContentMapValues.extractValue("dynamic", pipelineMapping);
|
||||
if ("strict".equals(dynamicMapping) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, pipeline mapping must be strict");
|
||||
}
|
||||
Boolean allEnabled = (Boolean) XContentMapValues.extractValue("_all.enabled", pipelineMapping);
|
||||
if (Boolean.FALSE.equals(allEnabled) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, _all field is enabled");
|
||||
}
|
||||
|
||||
String processorsType = (String) XContentMapValues.extractValue("properties.processors.type", pipelineMapping);
|
||||
if ("object".equals(processorsType) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, processors field's type is [" + processorsType + "] while [object] is expected");
|
||||
}
|
||||
|
||||
Boolean processorsEnabled = (Boolean) XContentMapValues.extractValue("properties.processors.enabled", pipelineMapping);
|
||||
if (Boolean.FALSE.equals(processorsEnabled) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, processors field enabled option is [true] while [false] is expected");
|
||||
}
|
||||
|
||||
String processorsDynamic = (String) XContentMapValues.extractValue("properties.processors.dynamic", pipelineMapping);
|
||||
if ("true".equals(processorsDynamic) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, processors field dynamic option is [false] while [true] is expected");
|
||||
}
|
||||
|
||||
String onFailureType = (String) XContentMapValues.extractValue("properties.on_failure.type", pipelineMapping);
|
||||
if ("object".equals(onFailureType) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, on_failure field type option is [" + onFailureType + "] while [object] is expected");
|
||||
}
|
||||
|
||||
Boolean onFailureEnabled = (Boolean) XContentMapValues.extractValue("properties.on_failure.enabled", pipelineMapping);
|
||||
if (Boolean.FALSE.equals(onFailureEnabled) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, on_failure field enabled option is [true] while [false] is expected");
|
||||
}
|
||||
|
||||
String onFailureDynamic = (String) XContentMapValues.extractValue("properties.on_failure.dynamic", pipelineMapping);
|
||||
if ("true".equals(onFailureDynamic) == false) {
|
||||
throw new IllegalStateException("illegal ingest mapping, on_failure field dynamic option is [false] while [true] is expected");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
synchronized void start() throws Exception {
|
||||
if (started) {
|
||||
logger.debug("Pipeline already started");
|
||||
} else {
|
||||
updatePipelines();
|
||||
started = true;
|
||||
logger.debug("Pipeline store started with [{}] pipelines", pipelines.size());
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void stop(String reason) {
|
||||
if (started) {
|
||||
started = false;
|
||||
pipelines = new HashMap<>();
|
||||
logger.debug("Pipeline store stopped, reason [{}]", reason);
|
||||
} else {
|
||||
logger.debug("Pipeline alreadt stopped");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return started;
|
||||
}
|
||||
|
||||
private Iterable<SearchHit> readAllPipelines() {
|
||||
// TODO: the search should be replaced with an ingest API when it is available
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
sourceBuilder.version(true);
|
||||
sourceBuilder.sort("_doc", SortOrder.ASC);
|
||||
SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX);
|
||||
searchRequest.source(sourceBuilder);
|
||||
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest);
|
||||
}
|
||||
|
||||
private void ensureReady() {
|
||||
if (started == false) {
|
||||
throw new IllegalStateException("pipeline store isn't ready yet");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> ActionListener<T> handleWriteResponseAndReloadPipelines(ActionListener<T> listener) {
|
||||
return new ActionListener<T>() {
|
||||
@Override
|
||||
public void onResponse(T result) {
|
||||
try {
|
||||
reloadPipelinesAction.reloadPipelinesOnAllNodes(reloadResult -> listener.onResponse(result));
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.elasticsearch.ingest.core;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -78,20 +77,16 @@ public final class Pipeline {
|
|||
}
|
||||
|
||||
public final static class Factory {
|
||||
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
|
||||
Processor.Factory factory = processorRegistry.get(type);
|
||||
if (factory != null) {
|
||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
||||
Processor processor = factory.create(config);
|
||||
if (config.isEmpty() == false) {
|
||||
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
||||
}
|
||||
if (onFailureProcessors.isEmpty()) {
|
||||
return processor;
|
||||
}
|
||||
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
|
||||
|
||||
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
|
||||
String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants?
|
||||
List<Processor> processors = readProcessors("processors", processorRegistry, config);
|
||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
||||
if (config.isEmpty() == false) {
|
||||
throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
||||
}
|
||||
throw new IllegalArgumentException("No processor type exists with name [" + type + "]");
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
|
||||
return new Pipeline(id, description, compoundProcessor);
|
||||
}
|
||||
|
||||
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
|
||||
|
@ -108,12 +103,22 @@ public final class Pipeline {
|
|||
return onFailureProcessors;
|
||||
}
|
||||
|
||||
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
|
||||
String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants?
|
||||
List<Processor> processors = readProcessors("processors", processorRegistry, config);
|
||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
|
||||
return new Pipeline(id, description, compoundProcessor);
|
||||
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
|
||||
Processor.Factory factory = processorRegistry.get(type);
|
||||
if (factory != null) {
|
||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
||||
Processor processor = factory.create(config);
|
||||
if (config.isEmpty() == false) {
|
||||
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
||||
}
|
||||
if (onFailureProcessors.isEmpty()) {
|
||||
return processor;
|
||||
}
|
||||
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
|
||||
}
|
||||
throw new IllegalArgumentException("No processor type exists with name [" + type + "]");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.rest.BaseRestHandler;
|
|||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
|
||||
|
||||
public class RestDeletePipelineAction extends BaseRestHandler {
|
||||
|
||||
|
@ -41,6 +41,8 @@ public class RestDeletePipelineAction extends BaseRestHandler {
|
|||
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
|
||||
DeletePipelineRequest request = new DeletePipelineRequest();
|
||||
request.id(restRequest.param("id"));
|
||||
client.deletePipeline(request, new RestStatusToXContentListener<>(channel));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||
client.deletePipeline(request, new AcknowledgedRestListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ public class RestGetPipelineAction extends BaseRestHandler {
|
|||
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
|
||||
GetPipelineRequest request = new GetPipelineRequest();
|
||||
request.ids(Strings.splitStringByCommaToArray(restRequest.param("id")));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
client.getPipeline(request, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.rest.BaseRestHandler;
|
|||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
|
||||
|
||||
public class RestPutPipelineAction extends BaseRestHandler {
|
||||
|
||||
|
@ -44,6 +44,8 @@ public class RestPutPipelineAction extends BaseRestHandler {
|
|||
if (restRequest.hasContent()) {
|
||||
request.source(restRequest.content());
|
||||
}
|
||||
client.putPipeline(request, new RestStatusToXContentListener<>(channel));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
|
||||
client.putPipeline(request, new AcknowledgedRestListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -776,6 +776,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(139, null);
|
||||
ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class);
|
||||
ids.put(141, org.elasticsearch.index.query.QueryShardException.class);
|
||||
ids.put(142, org.elasticsearch.ingest.PipelineMissingException.class);
|
||||
|
||||
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
||||
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
||||
|
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.ingest.PipelineStore;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.Before;
|
||||
import org.mockito.Matchers;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ReloadPipelinesActionTests extends ESTestCase {
|
||||
|
||||
private ClusterService clusterService;
|
||||
private TransportService transportService;
|
||||
private ReloadPipelinesAction reloadPipelinesAction;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
Settings settings = Settings.EMPTY;
|
||||
PipelineStore pipelineStore = mock(PipelineStore.class);
|
||||
clusterService = mock(ClusterService.class);
|
||||
transportService = mock(TransportService.class);
|
||||
reloadPipelinesAction = new ReloadPipelinesAction(settings, pipelineStore, clusterService, transportService);
|
||||
}
|
||||
|
||||
public void testSuccess() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
doAnswer(mock -> {
|
||||
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
|
||||
}
|
||||
return mock;
|
||||
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
|
||||
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true)));
|
||||
}
|
||||
|
||||
public void testWithAtLeastOneFailure() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
doAnswer(mock -> {
|
||||
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
|
||||
handler.handleException(new TransportException("test failure"));
|
||||
for (int i = 1; i < numNodes; i++) {
|
||||
if (randomBoolean()) {
|
||||
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
|
||||
} else {
|
||||
handler.handleException(new TransportException("test failure"));
|
||||
}
|
||||
}
|
||||
return mock;
|
||||
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
|
||||
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false)));
|
||||
}
|
||||
|
||||
private static DiscoveryNodes.Builder generateDiscoNodes(int numNodes) {
|
||||
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
String id = Integer.toString(i);
|
||||
DiscoveryNode discoNode = new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Version.CURRENT);
|
||||
discoNodes.put(discoNode);
|
||||
}
|
||||
return discoNodes;
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
// Not a real unit tests with mocks, but with a single node, because we mock the scroll
|
||||
// search behaviour and it changes then this test will not catch this.
|
||||
public class SearchScrollIteratorTests extends ESSingleNodeTestCase {
|
||||
|
||||
public void testSearchScrollIterator() {
|
||||
createIndex("index");
|
||||
int numDocs = scaledRandomIntBetween(0, 128);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("index", "type", Integer.toString(i))
|
||||
.setSource("field", "value" + i)
|
||||
.get();
|
||||
}
|
||||
client().admin().indices().prepareRefresh().get();
|
||||
|
||||
int i = 0;
|
||||
SearchRequest searchRequest = new SearchRequest("index");
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
// randomize size, because that also controls how many actual searches will happen:
|
||||
sourceBuilder.size(scaledRandomIntBetween(1, 10));
|
||||
searchRequest.source(sourceBuilder);
|
||||
Iterable<SearchHit> hits = SearchScrollIterator.createIterator(client(), TimeValue.timeValueSeconds(10), searchRequest);
|
||||
for (SearchHit hit : hits) {
|
||||
assertThat(hit.getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(hit.getSource().get("field"), equalTo("value" + i));
|
||||
i++;
|
||||
}
|
||||
assertThat(i, equalTo(numDocs));
|
||||
}
|
||||
|
||||
}
|
|
@ -1,276 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IngestBootstrapperTests extends ESTestCase {
|
||||
|
||||
private PipelineStore store;
|
||||
private IngestBootstrapper bootstrapper;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.executor(any())).thenReturn(Runnable::run);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
store = mock(PipelineStore.class);
|
||||
when(store.isStarted()).thenReturn(false);
|
||||
PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class);
|
||||
bootstrapper = new IngestBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService);
|
||||
}
|
||||
|
||||
public void testStartAndStopInBackground() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
Client client = mock(Client.class);
|
||||
TransportService transportService = mock(TransportService.class);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
|
||||
when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
|
||||
Settings settings = Settings.EMPTY;
|
||||
PipelineStore store = new PipelineStore(settings, clusterService, transportService);
|
||||
IngestBootstrapper bootstrapper = new IngestBootstrapper(
|
||||
settings, threadPool, clusterService, store, null
|
||||
);
|
||||
bootstrapper.setClient(client);
|
||||
|
||||
List<InternalSearchHit> hits = new ArrayList<>();
|
||||
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
|
||||
);
|
||||
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits));
|
||||
when(client.get(any())).thenReturn(PipelineStoreTests.expectedGetResponse(true));
|
||||
|
||||
try {
|
||||
store.get("1");
|
||||
fail("IllegalStateException expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("pipeline store isn't ready yet"));
|
||||
}
|
||||
|
||||
MetaData metadata = MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME))
|
||||
.build();
|
||||
bootstrapper.startPipelineStore(metadata);
|
||||
assertBusy(() -> {
|
||||
assertThat(store.isStarted(), is(true));
|
||||
assertThat(store.get("1"), notNullValue());
|
||||
assertThat(store.get("1").getId(), equalTo("1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
});
|
||||
|
||||
bootstrapper.stopPipelineStore("testing stop");
|
||||
assertBusy(() -> assertThat(store.isStarted(), is(false)));
|
||||
|
||||
// the map internal search hit holds gets emptied after use, which is ok, but in this test we need to reset the source:
|
||||
hits.get(0).sourceRef(new BytesArray("{\"description\": \"_description1\"}"));
|
||||
hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
|
||||
);
|
||||
bootstrapper.startPipelineStore(metadata);
|
||||
assertBusy(() -> {
|
||||
assertThat(store.isStarted(), is(true));
|
||||
assertThat(store.get("1"), notNullValue());
|
||||
assertThat(store.get("1").getId(), equalTo("1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
assertThat(store.get("2"), notNullValue());
|
||||
assertThat(store.get("2").getId(), equalTo("2"));
|
||||
assertThat(store.get("2").getDescription(), equalTo("_description2"));
|
||||
});
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingGlobalStateNotRecoveredBlock() throws Exception {
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
csBuilder.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK));
|
||||
ClusterState cs = csBuilder.metaData(MetaData.builder()).build();
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, never()).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingGlobalStateNoMasterBlock() throws Exception {
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
csBuilder.blocks(ClusterBlocks.builder()
|
||||
.addGlobalBlock(randomBoolean() ? DiscoverySettings.NO_MASTER_BLOCK_WRITES : DiscoverySettings.NO_MASTER_BLOCK_ALL));
|
||||
ClusterState cs = csBuilder.metaData(
|
||||
MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME))
|
||||
).build();
|
||||
|
||||
// We're not started and there is a no master block, doing nothing:
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, never()).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
|
||||
// We're started and there is a no master block, so we stop the store:
|
||||
when(store.isStarted()).thenReturn(true);
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, never()).start();
|
||||
verify(store, times(1)).stop(anyString());
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingNoIngestIndex() throws Exception {
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
ClusterState cs = csBuilder.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME)))
|
||||
.build();
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, times(1)).start();
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingIngestIndexShardsNotStarted() throws Exception {
|
||||
// .ingest index, but not all primary shards started:
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaDateBuilder = MetaData.builder();
|
||||
metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME));
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
Settings settings = settings(Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.build();
|
||||
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
|
||||
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
|
||||
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.INITIALIZING, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
|
||||
.build());
|
||||
indexRoutingTableBuilder.addReplica();
|
||||
routingTableBuilder.add(indexRoutingTableBuilder.build());
|
||||
csBuilder.metaData(metaDateBuilder);
|
||||
csBuilder.routingTable(routingTableBuilder.build());
|
||||
ClusterState cs = csBuilder.build();
|
||||
|
||||
// We're not running and the cluster state isn't ready, so we don't start.
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, never()).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
|
||||
// We're running and the cluster state indicates that all our shards are unassigned, so we stop.
|
||||
when(store.isStarted()).thenReturn(true);
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, never()).start();
|
||||
verify(store, times(1)).stop(anyString());
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingIngestIndexShardsStarted() throws Exception {
|
||||
// .ingest index, but not all primary shards started:
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaDateBuilder = MetaData.builder();
|
||||
metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME));
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
Settings settings = settings(Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.build();
|
||||
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
|
||||
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
|
||||
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
|
||||
.build());
|
||||
indexRoutingTableBuilder.addReplica();
|
||||
routingTableBuilder.add(indexRoutingTableBuilder.build());
|
||||
csBuilder.metaData(metaDateBuilder);
|
||||
csBuilder.routingTable(routingTableBuilder.build());
|
||||
ClusterState cs = csBuilder.build();
|
||||
|
||||
// We're not running and the cluster state is ready, so we start.
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, times(1)).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
|
||||
// We're running and the cluster state is good, so we do nothing.
|
||||
when(store.isStarted()).thenReturn(true);
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, times(1)).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
}
|
||||
|
||||
public void testPipelineStoreBootstrappingFailure() throws Exception {
|
||||
// .ingest index, but not all primary shards started:
|
||||
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||
MetaData.Builder metaDateBuilder = MetaData.builder();
|
||||
metaDateBuilder.put(IndexTemplateMetaData.builder(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME));
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
Settings settings = settings(Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.build();
|
||||
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
|
||||
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
|
||||
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
|
||||
.build());
|
||||
indexRoutingTableBuilder.addReplica();
|
||||
routingTableBuilder.add(indexRoutingTableBuilder.build());
|
||||
csBuilder.metaData(metaDateBuilder);
|
||||
csBuilder.routingTable(routingTableBuilder.build());
|
||||
ClusterState cs = csBuilder.build();
|
||||
|
||||
// fail the first call with an runtime exception and subsequent calls just return:
|
||||
doThrow(new RuntimeException()).doNothing().when(store).start();
|
||||
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
|
||||
verify(store, times(2)).start();
|
||||
verify(store, never()).stop(anyString());
|
||||
}
|
||||
|
||||
}
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.ingest;
|
|||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
|
@ -32,6 +31,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequest;
|
|||
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
|
||||
import org.elasticsearch.action.ingest.WritePipelineResponse;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.core.IngestDocument;
|
||||
|
@ -164,7 +164,6 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
|
||||
PutPipelineRequest putPipelineRequest = new PutPipelineRequest();
|
||||
putPipelineRequest.id("_id");
|
||||
putPipelineRequest.source(jsonBuilder().startObject()
|
||||
|
@ -200,9 +199,8 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
|
||||
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest();
|
||||
deletePipelineRequest.id("_id");
|
||||
DeleteResponse response = client().deletePipeline(deletePipelineRequest).get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
assertThat(response.getId(), equalTo("_id"));
|
||||
WritePipelineResponse response = client().deletePipeline(deletePipelineRequest).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
|
||||
getResponse = client().prepareGetPipeline().setIds("_id").get();
|
||||
assertThat(getResponse.isFound(), is(false));
|
||||
|
|
|
@ -19,444 +19,175 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.ingest.core.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.SetProcessor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PipelineStoreTests extends ESTestCase {
|
||||
|
||||
private PipelineStore store;
|
||||
private Client client;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
Settings settings = Settings.EMPTY;
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
TransportService transportService = mock(TransportService.class);
|
||||
|
||||
client = mock(Client.class);
|
||||
when(client.search(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
|
||||
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
|
||||
store = new PipelineStore(settings, clusterService, transportService);
|
||||
store.setClient(client);
|
||||
store.start();
|
||||
store = new PipelineStore(Settings.EMPTY, clusterService);
|
||||
ProcessorsRegistry registry = new ProcessorsRegistry();
|
||||
registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance()));
|
||||
store.buildProcessorFactoryRegistry(registry, null, null);
|
||||
}
|
||||
|
||||
public void testUpdatePipeline() throws Exception {
|
||||
List<InternalSearchHit> hits = new ArrayList<>();
|
||||
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
|
||||
public void testUpdatePipelines() {
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
assertThat(store.pipelines.size(), is(0));
|
||||
|
||||
PipelineConfiguration pipeline = new PipelineConfiguration(
|
||||
"_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")
|
||||
);
|
||||
IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline));
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata))
|
||||
.build();
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
assertThat(store.pipelines.size(), is(1));
|
||||
assertThat(store.pipelines.get("_id").getId(), equalTo("_id"));
|
||||
assertThat(store.pipelines.get("_id").getDescription(), nullValue());
|
||||
assertThat(store.pipelines.get("_id").getProcessors().size(), equalTo(1));
|
||||
assertThat(store.pipelines.get("_id").getProcessors().get(0).getType(), equalTo("set"));
|
||||
}
|
||||
|
||||
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
|
||||
when(client.get(any())).thenReturn(expectedGetResponse(true));
|
||||
assertThat(store.get("1"), nullValue());
|
||||
public void testPut() {
|
||||
String id = "_id";
|
||||
Pipeline pipeline = store.get(id);
|
||||
assertThat(pipeline, nullValue());
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
|
||||
|
||||
store.updatePipelines();
|
||||
assertThat(store.get("1").getId(), equalTo("1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
// add a new pipeline:
|
||||
PutPipelineRequest putRequest = new PutPipelineRequest();
|
||||
putRequest.id(id);
|
||||
putRequest.source(new BytesArray("{\"processors\": []}"));
|
||||
clusterState = store.innerPut(putRequest, clusterState);
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
pipeline = store.get(id);
|
||||
assertThat(pipeline, notNullValue());
|
||||
assertThat(pipeline.getId(), equalTo(id));
|
||||
assertThat(pipeline.getDescription(), nullValue());
|
||||
assertThat(pipeline.getProcessors().size(), equalTo(0));
|
||||
|
||||
when(client.get(any())).thenReturn(expectedGetResponse(true));
|
||||
hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap())
|
||||
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
|
||||
// overwrite existing pipeline:
|
||||
putRequest = new PutPipelineRequest();
|
||||
putRequest.id(id);
|
||||
putRequest.source(new BytesArray("{\"processors\": [], \"description\": \"_description\"}"));
|
||||
clusterState = store.innerPut(putRequest, clusterState);
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
pipeline = store.get(id);
|
||||
assertThat(pipeline, notNullValue());
|
||||
assertThat(pipeline.getId(), equalTo(id));
|
||||
assertThat(pipeline.getDescription(), equalTo("_description"));
|
||||
assertThat(pipeline.getProcessors().size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testDelete() {
|
||||
PipelineConfiguration config = new PipelineConfiguration(
|
||||
"_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")
|
||||
);
|
||||
store.updatePipelines();
|
||||
assertThat(store.get("1").getId(), equalTo("1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
assertThat(store.get("2").getId(), equalTo("2"));
|
||||
assertThat(store.get("2").getDescription(), equalTo("_description2"));
|
||||
|
||||
hits.remove(1);
|
||||
when(client.get(eqGetRequest(PipelineStore.INDEX, PipelineStore.TYPE, "2"))).thenReturn(expectedGetResponse(false));
|
||||
store.updatePipelines();
|
||||
assertThat(store.get("1").getId(), equalTo("1"));
|
||||
assertThat(store.get("1").getDescription(), equalTo("_description1"));
|
||||
assertThat(store.get("2"), nullValue());
|
||||
}
|
||||
|
||||
public void testGetReference() throws Exception {
|
||||
// fill the store up for the test:
|
||||
List<InternalSearchHit> hits = new ArrayList<>();
|
||||
hits.add(new InternalSearchHit(0, "foo", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
|
||||
hits.add(new InternalSearchHit(0, "bar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
|
||||
hits.add(new InternalSearchHit(0, "foobar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
|
||||
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
|
||||
store.updatePipelines();
|
||||
|
||||
List<PipelineDefinition> result = store.getReference("foo");
|
||||
assertThat(result.size(), equalTo(1));
|
||||
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
|
||||
|
||||
result = store.getReference("foo*");
|
||||
// to make sure the order is consistent in the test:
|
||||
result.sort((first, second) -> {
|
||||
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
|
||||
});
|
||||
assertThat(result.size(), equalTo(2));
|
||||
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
|
||||
assertThat(result.get(1).getPipeline().getId(), equalTo("foobar"));
|
||||
|
||||
result = store.getReference("bar*");
|
||||
assertThat(result.size(), equalTo(1));
|
||||
assertThat(result.get(0).getPipeline().getId(), equalTo("bar"));
|
||||
|
||||
result = store.getReference("*");
|
||||
// to make sure the order is consistent in the test:
|
||||
result.sort((first, second) -> {
|
||||
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
|
||||
});
|
||||
assertThat(result.size(), equalTo(3));
|
||||
assertThat(result.get(0).getPipeline().getId(), equalTo("bar"));
|
||||
assertThat(result.get(1).getPipeline().getId(), equalTo("foo"));
|
||||
assertThat(result.get(2).getPipeline().getId(), equalTo("foobar"));
|
||||
|
||||
result = store.getReference("foo", "bar");
|
||||
assertThat(result.size(), equalTo(2));
|
||||
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
|
||||
assertThat(result.get(1).getPipeline().getId(), equalTo("bar"));
|
||||
}
|
||||
|
||||
public void testValidateIngestIndex() throws Exception {
|
||||
// ingest index doesn't exist:
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder())
|
||||
IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config));
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata))
|
||||
.build();
|
||||
assertThat(store.isIngestIndexPresent(state), equalTo(false));
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
assertThat(store.get("_id"), notNullValue());
|
||||
|
||||
// ingest index does exist and is valid:
|
||||
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
assertThat(store.isIngestIndexPresent(state), equalTo(true));
|
||||
// Delete pipeline:
|
||||
DeletePipelineRequest deleteRequest = new DeletePipelineRequest();
|
||||
deleteRequest.id("_id");
|
||||
clusterState = store.innerDelete(deleteRequest, clusterState);
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
assertThat(store.get("_id"), nullValue());
|
||||
|
||||
// fails, has dynamic mapping
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.mapper.dynamic", true)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
// Delete existing pipeline:
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
store.innerDelete(deleteRequest, clusterState);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.mapper.dynamic] setting is [true] while [false] is expected"));
|
||||
}
|
||||
|
||||
// fails, incorrect number of primary shards
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.number_of_shards] setting is [2] while [1] is expected"));
|
||||
}
|
||||
|
||||
// fails, incorrect number of replica shards
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.number_of_replicas", 2)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, PipelineStore.PIPELINE_MAPPING);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest index setting, [index.number_of_replicas] setting is [2] while [1] is expected"));
|
||||
}
|
||||
|
||||
// fails not a strict mapping:
|
||||
String mapping = XContentFactory.jsonBuilder().startObject()
|
||||
.startObject("_all")
|
||||
.field("enabled", false)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", true)
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", true)
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, mapping);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest mapping, pipeline mapping must be strict"));
|
||||
}
|
||||
|
||||
// fails _all field is enabled:
|
||||
mapping = XContentFactory.jsonBuilder().startObject()
|
||||
.field("dynamic", "strict")
|
||||
.startObject("_all")
|
||||
.field("enabled", true)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, mapping);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest mapping, _all field is enabled"));
|
||||
}
|
||||
|
||||
// fails processor field not of type object:
|
||||
mapping = XContentFactory.jsonBuilder().startObject()
|
||||
.field("dynamic", "strict")
|
||||
.startObject("_all")
|
||||
.field("enabled", false)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "nested")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, mapping);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field's type is [nested] while [object] is expected"));
|
||||
}
|
||||
|
||||
// fails processor field enabled option is true:
|
||||
mapping = XContentFactory.jsonBuilder().startObject()
|
||||
.field("dynamic", "strict")
|
||||
.startObject("_all")
|
||||
.field("enabled", false)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "object")
|
||||
.field("enabled", true)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, mapping);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field enabled option is [true] while [false] is expected"));
|
||||
}
|
||||
|
||||
// fails processor field dynamic option is false:
|
||||
mapping = XContentFactory.jsonBuilder().startObject()
|
||||
.field("dynamic", "strict")
|
||||
.startObject("_all")
|
||||
.field("enabled", false)
|
||||
.endObject()
|
||||
.startObject("properties")
|
||||
.startObject("processors")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "false")
|
||||
.endObject()
|
||||
.startObject("on_failure")
|
||||
.field("type", "object")
|
||||
.field("enabled", false)
|
||||
.field("dynamic", "true")
|
||||
.endObject()
|
||||
.startObject("description")
|
||||
.field("type", "string")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject().string();
|
||||
indexMetaData = IndexMetaData.builder(PipelineStore.INDEX)
|
||||
.settings(Settings.builder()
|
||||
.put(PipelineStore.INGEST_INDEX_SETTING)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
)
|
||||
.putMapping(PipelineStore.TYPE, mapping);
|
||||
state = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetaData))
|
||||
.build();
|
||||
try {
|
||||
store.isIngestIndexPresent(state);
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal ingest mapping, processors field dynamic option is [false] while [true] is expected"));
|
||||
} catch (PipelineMissingException e) {
|
||||
assertThat(e.getMessage(), equalTo("pipeline [_id] is missing"));
|
||||
}
|
||||
}
|
||||
|
||||
static ActionFuture<SearchResponse> expectedSearchReponse(List<InternalSearchHit> hits) {
|
||||
return new PlainActionFuture<SearchResponse>() {
|
||||
public void testGetPipelines() {
|
||||
Map<String, PipelineConfiguration> configs = new HashMap<>();
|
||||
configs.put("_id1", new PipelineConfiguration(
|
||||
"_id1", new BytesArray("{\"processors\": []}")
|
||||
));
|
||||
configs.put("_id2", new PipelineConfiguration(
|
||||
"_id2", new BytesArray("{\"processors\": []}")
|
||||
));
|
||||
|
||||
@Override
|
||||
public SearchResponse get(long timeout, TimeUnit unit) {
|
||||
InternalSearchHits hits1 = new InternalSearchHits(hits.toArray(new InternalSearchHit[0]), hits.size(), 1f);
|
||||
return new SearchResponse(new InternalSearchResponse(hits1, null, null, null, false, null), "_scrollId", 1, 1, 1, null);
|
||||
}
|
||||
};
|
||||
assertThat(store.innerGetPipelines(null, "_id1").isEmpty(), is(true));
|
||||
|
||||
IngestMetadata ingestMetadata = new IngestMetadata(configs);
|
||||
List<PipelineConfiguration> pipelines = store.innerGetPipelines(ingestMetadata, "_id1");
|
||||
assertThat(pipelines.size(), equalTo(1));
|
||||
assertThat(pipelines.get(0).getId(), equalTo("_id1"));
|
||||
|
||||
pipelines = store.innerGetPipelines(ingestMetadata, "_id1", "_id2");
|
||||
assertThat(pipelines.size(), equalTo(2));
|
||||
assertThat(pipelines.get(0).getId(), equalTo("_id1"));
|
||||
assertThat(pipelines.get(1).getId(), equalTo("_id2"));
|
||||
|
||||
pipelines = store.innerGetPipelines(ingestMetadata, "_id*");
|
||||
pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId()));
|
||||
assertThat(pipelines.size(), equalTo(2));
|
||||
assertThat(pipelines.get(0).getId(), equalTo("_id1"));
|
||||
assertThat(pipelines.get(1).getId(), equalTo("_id2"));
|
||||
}
|
||||
|
||||
static ActionFuture<GetResponse> expectedGetResponse(boolean exists) {
|
||||
return new PlainActionFuture<GetResponse>() {
|
||||
@Override
|
||||
public GetResponse get() throws InterruptedException, ExecutionException {
|
||||
return new GetResponse(new GetResult("_index", "_type", "_id", 1, exists, null, null));
|
||||
}
|
||||
};
|
||||
}
|
||||
public void testCrud() throws Exception {
|
||||
String id = "_id";
|
||||
Pipeline pipeline = store.get(id);
|
||||
assertThat(pipeline, nullValue());
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
|
||||
|
||||
static GetRequest eqGetRequest(String index, String type, String id) {
|
||||
return argThat(new GetRequestMatcher(index, type, id));
|
||||
}
|
||||
PutPipelineRequest putRequest = new PutPipelineRequest();
|
||||
putRequest.id(id);
|
||||
putRequest.source(new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"));
|
||||
clusterState = store.innerPut(putRequest, clusterState);
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
pipeline = store.get(id);
|
||||
assertThat(pipeline, notNullValue());
|
||||
assertThat(pipeline.getId(), equalTo(id));
|
||||
assertThat(pipeline.getDescription(), nullValue());
|
||||
assertThat(pipeline.getProcessors().size(), equalTo(1));
|
||||
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set"));
|
||||
|
||||
static class GetRequestMatcher extends ArgumentMatcher<GetRequest> {
|
||||
|
||||
private final String index;
|
||||
private final String type;
|
||||
private final String id;
|
||||
|
||||
public GetRequestMatcher(String index, String type, String id) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
GetRequest getRequest = (GetRequest) o;
|
||||
return Objects.equals(getRequest.index(), index) &&
|
||||
Objects.equals(getRequest.type(), type) &&
|
||||
Objects.equals(getRequest.id(), id);
|
||||
}
|
||||
DeletePipelineRequest deleteRequest = new DeletePipelineRequest();
|
||||
deleteRequest.id(id);
|
||||
clusterState = store.innerDelete(deleteRequest, clusterState);
|
||||
store.innerUpdatePipelines(clusterState);
|
||||
pipeline = store.get(id);
|
||||
assertThat(pipeline, nullValue());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -54,7 +54,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -91,7 +91,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -58,7 +58,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -103,7 +103,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -15,25 +15,18 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 1 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.get_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { my_pipeline._source.description: "_description" }
|
||||
- match: { my_pipeline._version: 1 }
|
||||
- match: { pipelines.0.id: "my_pipeline" }
|
||||
- match: { pipelines.0.config.description: "_description" }
|
||||
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 2 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { found: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
---
|
||||
"Test ingest simulate API works fine when node.ingest is set to false":
|
||||
|
@ -52,7 +45,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.simulate:
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_1" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -72,7 +72,7 @@
|
|||
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_1" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
|
@ -89,7 +89,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_2" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
|
@ -105,7 +105,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_3" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -198,7 +198,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_handled_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -13,6 +13,14 @@
|
|||
}
|
||||
},
|
||||
"params": {
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout for connection to master node"
|
||||
},
|
||||
"timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
|
|
|
@ -13,6 +13,10 @@
|
|||
}
|
||||
},
|
||||
"params": {
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
|
|
|
@ -13,6 +13,14 @@
|
|||
}
|
||||
},
|
||||
"params": {
|
||||
"master_timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout for connection to master node"
|
||||
},
|
||||
"timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
|
|
|
@ -15,25 +15,18 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 1 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.get_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { my_pipeline._source.description: "_description" }
|
||||
- match: { my_pipeline._version: 1 }
|
||||
- match: { pipelines.0.id: "my_pipeline" }
|
||||
- match: { pipelines.0.config.description: "_description" }
|
||||
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 2 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { found: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
|
@ -82,25 +75,18 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 1 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.get_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { my_pipeline._source.description: "_description" }
|
||||
- match: { my_pipeline._version: 1 }
|
||||
- match: { pipelines.0.id: "my_pipeline" }
|
||||
- match: { pipelines.0.config.description: "_description" }
|
||||
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 2 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { found: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -72,7 +72,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -130,7 +130,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.simulate:
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -88,7 +88,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
catch: request
|
||||
|
@ -49,7 +49,7 @@
|
|||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
Loading…
Reference in New Issue