diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java index cc0f38c7d83..b69c683ca29 100644 --- a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java @@ -7,21 +7,40 @@ package org.elasticsearch.xpack.logstash; import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; import org.elasticsearch.xpack.core.template.TemplateUtils; +import org.elasticsearch.xpack.logstash.action.DeletePipelineAction; +import org.elasticsearch.xpack.logstash.action.GetPipelineAction; +import org.elasticsearch.xpack.logstash.action.PutPipelineAction; +import org.elasticsearch.xpack.logstash.action.TransportDeletePipelineAction; +import org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction; +import org.elasticsearch.xpack.logstash.action.TransportPutPipelineAction; +import org.elasticsearch.xpack.logstash.rest.RestDeletePipelineAction; +import org.elasticsearch.xpack.logstash.rest.RestGetPipelineAction; +import org.elasticsearch.xpack.logstash.rest.RestPutPipelineAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.function.UnaryOperator; /** @@ -29,7 +48,7 @@ import java.util.function.UnaryOperator; */ public class Logstash extends Plugin implements SystemIndexPlugin { - private static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash"; + public static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash"; private static final String LOGSTASH_TEMPLATE_FILE_NAME = "logstash-management"; private static final String LOGSTASH_INDEX_TEMPLATE_NAME = ".logstash-management"; private static final String OLD_LOGSTASH_INDEX_NAME = "logstash-index-template"; @@ -43,6 +62,32 @@ public class Logstash extends Plugin implements SystemIndexPlugin { return modules; } + @Override + public List> getActions() { + return org.elasticsearch.common.collect.List.of( + new ActionHandler<>(PutPipelineAction.INSTANCE, TransportPutPipelineAction.class), + new ActionHandler<>(GetPipelineAction.INSTANCE, TransportGetPipelineAction.class), + new ActionHandler<>(DeletePipelineAction.INSTANCE, TransportDeletePipelineAction.class) + ); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return org.elasticsearch.common.collect.List.of( + new RestPutPipelineAction(), + new RestGetPipelineAction(), + new RestDeletePipelineAction() + ); + } + public UnaryOperator> getIndexTemplateMetadataUpgrader() { return templates -> { templates.keySet().removeIf(OLD_LOGSTASH_INDEX_NAME::equals); diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Pipeline.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Pipeline.java new file mode 100644 index 00000000000..27d4c34c1f6 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Pipeline.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class Pipeline { + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "pipeline", + true, + (objects, id) -> { + Iterator iterator = Arrays.asList(objects).iterator(); + return new Pipeline( + id, + (Instant) iterator.next(), + (Map) iterator.next(), + (String) iterator.next(), + (String) iterator.next(), + (Map) iterator.next() + ); + } + ); + + public static final ParseField LAST_MODIFIED = new ParseField("last_modified"); + public static final ParseField PIPELINE_METADATA = new ParseField("pipeline_metadata"); + public static final ParseField USERNAME = new ParseField("username"); + public static final ParseField PIPELINE = new ParseField("pipeline"); + public static final ParseField PIPELINE_SETTINGS = new ParseField("pipeline_settings"); + + static { + PARSER.declareField(constructorArg(), (parser, s) -> { + final String instantISOString = parser.text(); + return Instant.parse(instantISOString); + }, LAST_MODIFIED, ValueType.STRING); + PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_METADATA); + PARSER.declareString(constructorArg(), USERNAME); + PARSER.declareString(constructorArg(), PIPELINE); + PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_SETTINGS); + } + + private final String id; + private final Instant lastModified; + private final Map pipelineMetadata; + private final String username; + private final String pipeline; + private final Map pipelineSettings; + + public Pipeline( + String id, + Instant lastModified, + Map pipelineMetadata, + String username, + String pipeline, + Map pipelineSettings + ) { + this.id = id; + this.lastModified = lastModified; + this.pipelineMetadata = pipelineMetadata; + this.username = username; + this.pipeline = pipeline; + this.pipelineSettings = pipelineSettings; + } + + public String getId() { + return id; + } + + public Instant getLastModified() { + return lastModified; + } + + public Map getPipelineMetadata() { + return pipelineMetadata; + } + + public String getUsername() { + return username; + } + + public String getPipeline() { + return pipeline; + } + + public Map getPipelineSettings() { + return pipelineSettings; + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineAction.java new file mode 100644 index 00000000000..cde8c5dfca2 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionType; + +public class DeletePipelineAction extends ActionType { + + public static final String NAME = "cluster:admin/logstash/pipeline/delete"; + public static final DeletePipelineAction INSTANCE = new DeletePipelineAction(); + + private DeletePipelineAction() { + super(NAME, DeletePipelineResponse::new); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequest.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequest.java new file mode 100644 index 00000000000..2eee54a58a2 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequest.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class DeletePipelineRequest extends ActionRequest { + + private final String id; + + public DeletePipelineRequest(String id) { + this.id = Objects.requireNonNull(id); + } + + public DeletePipelineRequest(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + } + + public String id() { + return id; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeletePipelineRequest that = (DeletePipelineRequest) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponse.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponse.java new file mode 100644 index 00000000000..bb04b868a9c --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponse.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class DeletePipelineResponse extends ActionResponse { + + private final boolean deleted; + + public DeletePipelineResponse(boolean deleted) { + this.deleted = deleted; + } + + public DeletePipelineResponse(StreamInput in) throws IOException { + super(in); + this.deleted = in.readBoolean(); + } + + public boolean isDeleted() { + return deleted; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(deleted); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeletePipelineResponse that = (DeletePipelineResponse) o; + return deleted == that.deleted; + } + + @Override + public int hashCode() { + return Objects.hash(deleted); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineAction.java new file mode 100644 index 00000000000..03b46faaf31 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionType; + +public class GetPipelineAction extends ActionType { + + public static final String NAME = "cluster:admin/logstash/pipeline/get"; + public static final GetPipelineAction INSTANCE = new GetPipelineAction(); + + private GetPipelineAction() { + super(NAME, GetPipelineResponse::new); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequest.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequest.java new file mode 100644 index 00000000000..8a7960679c5 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequest.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class GetPipelineRequest extends ActionRequest { + + private final List ids; + + public GetPipelineRequest(List ids) { + this.ids = Objects.requireNonNull(ids); + } + + public GetPipelineRequest(StreamInput in) throws IOException { + super(in); + ids = in.readStringList(); + } + + public List ids() { + return ids; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringCollection(ids); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetPipelineRequest that = (GetPipelineRequest) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() { + return Objects.hash(ids); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponse.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponse.java new file mode 100644 index 00000000000..c7d58658bf0 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponse.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +public class GetPipelineResponse extends ActionResponse implements ToXContentObject { + + private final Map pipelines; + + public GetPipelineResponse(Map pipelines) { + this.pipelines = pipelines; + } + + public GetPipelineResponse(StreamInput in) throws IOException { + super(in); + this.pipelines = in.readMap(StreamInput::readString, StreamInput::readBytesReference); + } + + public Map pipelines() { + return pipelines; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(pipelines, StreamOutput::writeString, StreamOutput::writeBytesReference); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + for (Entry entry : pipelines.entrySet()) { + builder.rawField(entry.getKey(), entry.getValue().streamInput()); + } + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetPipelineResponse that = (GetPipelineResponse) o; + return Objects.equals(pipelines, that.pipelines); + } + + @Override + public int hashCode() { + return Objects.hash(pipelines); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineAction.java new file mode 100644 index 00000000000..7511e5daeaa --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionType; + +public class PutPipelineAction extends ActionType { + + public static final String NAME = "cluster:admin/logstash/pipeline/put"; + public static final PutPipelineAction INSTANCE = new PutPipelineAction(); + + private PutPipelineAction() { + super(NAME, PutPipelineResponse::new); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequest.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequest.java new file mode 100644 index 00000000000..525be7f2a1c --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequest.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.Objects; + +public class PutPipelineRequest extends ActionRequest { + + private final String id; + private final String source; + private final XContentType xContentType; + + public PutPipelineRequest(String id, String source, XContentType xContentType) { + this.id = id; + this.source = Objects.requireNonNull(source); + this.xContentType = Objects.requireNonNull(xContentType); + } + + public PutPipelineRequest(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + this.source = in.readString(); + this.xContentType = in.readEnum(XContentType.class); + } + + public String id() { + return id; + } + + public String source() { + return source; + } + + public XContentType xContentType() { + return xContentType; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeString(source); + out.writeEnum(xContentType); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutPipelineRequest that = (PutPipelineRequest) o; + return Objects.equals(id, that.id) && Objects.equals(source, that.source) && xContentType == that.xContentType; + } + + @Override + public int hashCode() { + return Objects.hash(id, source, xContentType); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponse.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponse.java new file mode 100644 index 00000000000..44d2ae940b6 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponse.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Objects; + +public class PutPipelineResponse extends ActionResponse { + + private final RestStatus status; + + public PutPipelineResponse(RestStatus status) { + this.status = Objects.requireNonNull(status); + } + + public PutPipelineResponse(StreamInput in) throws IOException { + super(in); + this.status = in.readEnum(RestStatus.class); + } + + public RestStatus status() { + return status; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(status); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PutPipelineResponse that = (PutPipelineResponse) o; + return status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(status); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportDeletePipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportDeletePipelineAction.java new file mode 100644 index 00000000000..11f3896600a --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportDeletePipelineAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse.Result; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.logstash.Logstash; + +public class TransportDeletePipelineAction extends HandledTransportAction { + + private final Client client; + + @Inject + public TransportDeletePipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(DeletePipelineAction.NAME, transportService, actionFilters, DeletePipelineRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, DeletePipelineRequest request, ActionListener listener) { + client.prepareDelete() + .setIndex(Logstash.LOGSTASH_CONCRETE_INDEX_NAME) + .setId(request.id()) + .execute( + ActionListener.wrap( + deleteResponse -> listener.onResponse(new DeletePipelineResponse(deleteResponse.getResult() == Result.DELETED)), + listener::onFailure + ) + ); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineAction.java new file mode 100644 index 00000000000..55b837f49a5 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineAction.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequestBuilder; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.logstash.Logstash; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class TransportGetPipelineAction extends HandledTransportAction { + + private static final Logger logger = LogManager.getLogger(TransportGetPipelineAction.class); + private final Client client; + + @Inject + public TransportGetPipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(GetPipelineAction.NAME, transportService, actionFilters, GetPipelineRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, GetPipelineRequest request, ActionListener listener) { + if (request.ids().isEmpty()) { + client.prepareSearch(Logstash.LOGSTASH_CONCRETE_INDEX_NAME) + .setSource( + SearchSourceBuilder.searchSource() + .fetchSource(true) + .query(QueryBuilders.matchAllQuery()) + .size(1000) + .trackTotalHits(true) + ) + .setScroll(TimeValue.timeValueMinutes(1L)) + .execute(ActionListener.wrap(searchResponse -> { + final int numHits = Math.toIntExact(searchResponse.getHits().getTotalHits().value); + final Map pipelineSources = new HashMap<>(numHits); + final Consumer clearScroll = (response) -> { + if (response != null && response.getScrollId() != null) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll( + clearScrollRequest, + ActionListener.wrap( + (r) -> {}, + e -> logger.warn( + new ParameterizedMessage("clear scroll failed for scroll id [{}]", response.getScrollId()), + e + ) + ) + ); + } + }; + handleSearchResponse(searchResponse, pipelineSources, clearScroll, listener); + }, listener::onFailure)); + } else if (request.ids().size() == 1) { + client.prepareGet(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, "_doc", request.ids().get(0)) + .setFetchSource(true) + .execute(ActionListener.wrap(response -> { + if (response.isExists()) { + listener.onResponse( + new GetPipelineResponse( + org.elasticsearch.common.collect.Map.of(response.getId(), response.getSourceAsBytesRef()) + ) + ); + } else { + listener.onResponse(new GetPipelineResponse(org.elasticsearch.common.collect.Map.of())); + } + }, listener::onFailure)); + } else { + MultiGetRequestBuilder requestBuilder = client.prepareMultiGet(); + for (String id : request.ids()) { + requestBuilder.add(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, "_doc", id); + } + requestBuilder.execute(ActionListener.wrap(mGetResponse -> { + logFailures(mGetResponse); + listener.onResponse( + new GetPipelineResponse( + Arrays.stream(mGetResponse.getResponses()) + .filter(itemResponse -> itemResponse.isFailed() == false) + .filter(itemResponse -> itemResponse.getResponse().isExists()) + .map(MultiGetItemResponse::getResponse) + .collect(Collectors.toMap(GetResponse::getId, GetResponse::getSourceAsBytesRef)) + ) + ); + }, listener::onFailure)); + } + } + + private void handleSearchResponse( + SearchResponse searchResponse, + Map pipelineSources, + Consumer clearScroll, + ActionListener listener + ) { + for (SearchHit hit : searchResponse.getHits().getHits()) { + pipelineSources.put(hit.getId(), hit.getSourceRef()); + } + + if (pipelineSources.size() > searchResponse.getHits().getTotalHits().value) { + clearScroll.accept(searchResponse); + listener.onFailure( + new IllegalStateException( + "scrolling returned more hits [" + + pipelineSources.size() + + "] than expected [" + + searchResponse.getHits().getTotalHits().value + + "] so bailing out to prevent unbounded " + + "memory consumption." + ) + ); + } else if (pipelineSources.size() == searchResponse.getHits().getTotalHits().value) { + clearScroll.accept(searchResponse); + listener.onResponse(new GetPipelineResponse(pipelineSources)); + } else { + client.prepareSearchScroll(searchResponse.getScrollId()) + .setScroll(TimeValue.timeValueMinutes(1L)) + .execute( + ActionListener.wrap( + searchResponse1 -> handleSearchResponse(searchResponse1, pipelineSources, clearScroll, listener), + listener::onFailure + ) + ); + } + } + + private void logFailures(MultiGetResponse multiGetResponse) { + List ids = Arrays.stream(multiGetResponse.getResponses()) + .filter(MultiGetItemResponse::isFailed) + .filter(itemResponse -> itemResponse.getFailure() != null) + .map(itemResponse -> itemResponse.getFailure().getId()) + .collect(Collectors.toList()); + if (ids.isEmpty() == false) { + logger.info("Could not retrieve logstash pipelines with ids: {}", ids); + } + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportPutPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportPutPipelineAction.java new file mode 100644 index 00000000000..ec37e0d8363 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportPutPipelineAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.logstash.Logstash; + +public class TransportPutPipelineAction extends HandledTransportAction { + + private final Client client; + + @Inject + public TransportPutPipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(PutPipelineAction.NAME, transportService, actionFilters, PutPipelineRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, PutPipelineRequest request, ActionListener listener) { + client.prepareIndex() + .setIndex(Logstash.LOGSTASH_CONCRETE_INDEX_NAME) + .setId(request.id()) + .setSource(request.source(), request.xContentType()) + .execute( + ActionListener.wrap( + indexResponse -> listener.onResponse(new PutPipelineResponse(indexResponse.status())), + listener::onFailure + ) + ); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestDeletePipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestDeletePipelineAction.java new file mode 100644 index 00000000000..b87d6879aa4 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestDeletePipelineAction.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestRequest.Method; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.xpack.logstash.action.DeletePipelineAction; +import org.elasticsearch.xpack.logstash.action.DeletePipelineRequest; +import org.elasticsearch.xpack.logstash.action.DeletePipelineResponse; + +import java.io.IOException; +import java.util.List; + +public class RestDeletePipelineAction extends BaseRestHandler { + + @Override + public String getName() { + return "logstash_delete_pipeline"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of(new Route(Method.DELETE, "/_logstash/pipeline/{id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final String id = request.param("id"); + return restChannel -> client.execute( + DeletePipelineAction.INSTANCE, + new DeletePipelineRequest(id), + new RestActionListener(restChannel) { + @Override + protected void processResponse(DeletePipelineResponse deletePipelineResponse) { + final RestStatus status = deletePipelineResponse.isDeleted() ? RestStatus.OK : RestStatus.NOT_FOUND; + channel.sendResponse(new BytesRestResponse(status, XContentType.JSON.mediaType(), BytesArray.EMPTY)); + } + } + ); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestGetPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestGetPipelineAction.java new file mode 100644 index 00000000000..eb24bc9fbf3 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestGetPipelineAction.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestRequest.Method; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.logstash.action.GetPipelineAction; +import org.elasticsearch.xpack.logstash.action.GetPipelineRequest; +import org.elasticsearch.xpack.logstash.action.GetPipelineResponse; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class RestGetPipelineAction extends BaseRestHandler { + + @Override + public String getName() { + return "logstash_get_pipeline"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of( + new Route(Method.GET, "/_logstash/pipeline"), + new Route(Method.GET, "/_logstash/pipeline/{id}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final List ids = Arrays.asList(request.paramAsStringArray("id", Strings.EMPTY_ARRAY)); + return restChannel -> client.execute( + GetPipelineAction.INSTANCE, + new GetPipelineRequest(ids), + new RestToXContentListener(restChannel) { + @Override + protected RestStatus getStatus(GetPipelineResponse response) { + if (response.pipelines().isEmpty() && ids.isEmpty() == false) { + return RestStatus.NOT_FOUND; + } + return RestStatus.OK; + } + } + ); + } +} diff --git a/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java new file mode 100644 index 00000000000..6045c879538 --- /dev/null +++ b/x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestRequest.Method; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.xpack.logstash.Pipeline; +import org.elasticsearch.xpack.logstash.action.PutPipelineAction; +import org.elasticsearch.xpack.logstash.action.PutPipelineRequest; +import org.elasticsearch.xpack.logstash.action.PutPipelineResponse; + +import java.io.IOException; +import java.util.List; + +public class RestPutPipelineAction extends BaseRestHandler { + + @Override + public String getName() { + return "logstash_put_pipeline"; + } + + @Override + public List routes() { + return org.elasticsearch.common.collect.List.of(new Route(Method.PUT, "/_logstash/pipeline/{id}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final String id = request.param("id"); + try (XContentParser parser = request.contentParser()) { + // parse pipeline for validation + Pipeline.PARSER.apply(parser, id); + } + + return restChannel -> { + final String content = request.content().utf8ToString(); + client.execute( + PutPipelineAction.INSTANCE, + new PutPipelineRequest(id, content, request.getXContentType()), + new RestActionListener(restChannel) { + @Override + protected void processResponse(PutPipelineResponse putPipelineResponse) throws Exception { + channel.sendResponse( + new BytesRestResponse(putPipelineResponse.status(), XContentType.JSON.mediaType(), BytesArray.EMPTY) + ); + } + } + ); + }; + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/LogstashPluginTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/LogstashPluginTests.java new file mode 100644 index 00000000000..7e4065690c4 --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/LogstashPluginTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.test.ESTestCase; + +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.contains; + +public class LogstashPluginTests extends ESTestCase { + + public void testSystemIndices() { + assertThat( + new Logstash().getSystemIndexDescriptors(Settings.EMPTY) + .stream() + .map(SystemIndexDescriptor::getIndexPattern) + .collect(Collectors.toList()), + contains(".logstash") + ); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequestTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequestTests.java new file mode 100644 index 00000000000..a3682dd1d64 --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineRequestTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class DeletePipelineRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return DeletePipelineRequest::new; + } + + @Override + protected DeletePipelineRequest createTestInstance() { + return new DeletePipelineRequest(randomAlphaOfLengthBetween(2, 10)); + } + + @Override + protected DeletePipelineRequest mutateInstance(DeletePipelineRequest instance) { + return new DeletePipelineRequest(instance.id() + randomAlphaOfLength(1)); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponseTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponseTests.java new file mode 100644 index 00000000000..b4df0e55144 --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/DeletePipelineResponseTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class DeletePipelineResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return DeletePipelineResponse::new; + } + + @Override + protected DeletePipelineResponse createTestInstance() { + return new DeletePipelineResponse(randomBoolean()); + } + + @Override + protected DeletePipelineResponse mutateInstance(DeletePipelineResponse instance) { + // return a response with the opposite boolean value + return new DeletePipelineResponse(instance.isDeleted() == false); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java new file mode 100644 index 00000000000..c3ff96fccea --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class GetPipelineRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GetPipelineRequest::new; + } + + @Override + protected GetPipelineRequest createTestInstance() { + return new GetPipelineRequest(randomList(0, 50, () -> randomAlphaOfLengthBetween(2, 10))); + } + + @Override + protected GetPipelineRequest mutateInstance(GetPipelineRequest instance) { + List ids = new ArrayList<>(); + ids.addAll(instance.ids()); + if (randomBoolean() || ids.size() == 0) { + // append another ID + ids.add(randomAlphaOfLengthBetween(2, 10)); + } else { + // change ID strings + ids = ids.stream().map(id -> id + randomAlphaOfLength(1)).collect(Collectors.toList()); + } + return new GetPipelineRequest(ids); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponseTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponseTests.java new file mode 100644 index 00000000000..1c2d3573a5b --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineResponseTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.HashMap; +import java.util.Map; + +public class GetPipelineResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GetPipelineResponse::new; + } + + @Override + protected GetPipelineResponse createTestInstance() { + final int numPipelines = randomIntBetween(1, 10); + final Map map = new HashMap<>(numPipelines); + for (int i = 0; i < numPipelines; i++) { + final String name = randomAlphaOfLengthBetween(2, 10); + final BytesReference ref = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 16))); + map.put(name, ref); + } + return new GetPipelineResponse(map); + } + + @Override + protected GetPipelineResponse mutateInstance(GetPipelineResponse instance) { + Map map = new HashMap<>(instance.pipelines().size() + 1); + map.putAll(instance.pipelines()); + map.put(randomAlphaOfLengthBetween(2, 10), new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 16)))); + return new GetPipelineResponse(map); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequestTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequestTests.java new file mode 100644 index 00000000000..e727ec0422f --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineRequestTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class PutPipelineRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return PutPipelineRequest::new; + } + + @Override + protected PutPipelineRequest createTestInstance() { + return new PutPipelineRequest(randomAlphaOfLength(2), randomAlphaOfLengthBetween(10, 100), randomFrom(XContentType.values())); + } + + @Override + protected PutPipelineRequest mutateInstance(PutPipelineRequest instance) { + return new PutPipelineRequest( + instance.id() + randomAlphaOfLength(1), + instance.source() + randomAlphaOfLength(1), + instance.xContentType() + ); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponseTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponseTests.java new file mode 100644 index 00000000000..6ac6eed557f --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/PutPipelineResponseTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class PutPipelineResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return PutPipelineResponse::new; + } + + @Override + protected PutPipelineResponse createTestInstance() { + return new PutPipelineResponse(randomFrom(RestStatus.OK, RestStatus.CREATED)); + } + + @Override + protected PutPipelineResponse mutateInstance(PutPipelineResponse instance) { + if (instance.status() == RestStatus.OK) { + return new PutPipelineResponse(RestStatus.CREATED); + } + return new PutPipelineResponse(RestStatus.OK); + } +} diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineActionTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineActionTests.java new file mode 100644 index 00000000000..5b617370216 --- /dev/null +++ b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineActionTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.logstash.action; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.transport.TransportService; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportGetPipelineActionTests extends ESTestCase { + + /** + * Test that an error message is logged on a partial failure of + * a TransportGetPipelineAction. + */ + public void testGetPipelineMultipleIDsPartialFailure() throws Exception { + // Set up a log appender for detecting log messages + final MockLogAppender mockLogAppender = new MockLogAppender(); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "message", + "org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction", + Level.INFO, + "Could not retrieve logstash pipelines with ids: [2]" + ) + ); + mockLogAppender.start(); + final Logger logger = LogManager.getLogger(TransportGetPipelineAction.class); + + // Set up a MultiGetResponse + GetResponse mockResponse = mock(GetResponse.class); + when(mockResponse.getId()).thenReturn("1"); + when(mockResponse.getSourceAsBytesRef()).thenReturn(mock(BytesReference.class)); + when(mockResponse.isExists()).thenReturn(true); + MultiGetResponse.Failure failure = mock(MultiGetResponse.Failure.class); + when(failure.getId()).thenReturn("2"); + MultiGetResponse multiGetResponse = new MultiGetResponse( + new MultiGetItemResponse[] { new MultiGetItemResponse(mockResponse, null), new MultiGetItemResponse(null, failure) } + ); + + GetPipelineRequest request = new GetPipelineRequest(org.elasticsearch.common.collect.List.of("1", "2")); + + // Set up an ActionListener for the actual test conditions + ActionListener testActionListener = new ActionListener() { + @Override + public void onResponse(GetPipelineResponse getPipelineResponse) { + // check successful pipeline get + assertThat(getPipelineResponse, is(notNullValue())); + assertThat(getPipelineResponse.pipelines().size(), equalTo(1)); + + // check that failed pipeline get is logged + mockLogAppender.assertAllExpectationsMatched(); + } + + @Override + public void onFailure(Exception e) { + // do nothing + } + }; + + try (Client client = getMockClient(multiGetResponse)) { + Loggers.addAppender(logger, mockLogAppender); + TransportGetPipelineAction action = new TransportGetPipelineAction( + mock(TransportService.class), + mock(ActionFilters.class), + client + ); + action.doExecute(null, request, testActionListener); + } finally { + Loggers.removeAppender(logger, mockLogAppender); + mockLogAppender.stop(); + } + } + + private Client getMockClient(ActionResponse response) { + return new NoOpClient(getTestName()) { + @Override + @SuppressWarnings("unchecked") + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + listener.onResponse((Response) response); + } + }; + } +} diff --git a/x-pack/plugin/src/javaRestTest/java/org/elasticsearch/xpack/test/rest/LogstashSystemIndexIT.java b/x-pack/plugin/src/javaRestTest/java/org/elasticsearch/xpack/test/rest/LogstashSystemIndexIT.java new file mode 100644 index 00000000000..bd76b7039bc --- /dev/null +++ b/x-pack/plugin/src/javaRestTest/java/org/elasticsearch/xpack/test/rest/LogstashSystemIndexIT.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.test.rest; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +public class LogstashSystemIndexIT extends ESRestTestCase { + static final String BASIC_AUTH_VALUE = + basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + + @Override + protected Settings restClientSettings() { + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE) + .build(); + } + + public void testTemplateIsPut() throws Exception { + assertBusy( + () -> assertThat( + client().performRequest(new Request("HEAD", "/_template/.logstash-management")).getStatusLine().getStatusCode(), + is(200) + ) + ); + } + + public void testPipelineCRUD() throws Exception { + // put pipeline + final String pipelineJson = getPipelineJson(); + createPipeline("test_pipeline", pipelineJson); + + // get pipeline + Request getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline"); + Response getResponse = client().performRequest(getRequest); + assertThat(getResponse.getStatusLine().getStatusCode(), is(200)); + assertThat(EntityUtils.toString(getResponse.getEntity()), containsString(pipelineJson)); + + // update + final String updatedJson = getPipelineJson("2020-03-09T15:42:35.229Z"); + Request putRequest = new Request("PUT", "/_logstash/pipeline/test_pipeline"); + putRequest.setJsonEntity(updatedJson); + Response putResponse = client().performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline"); + getResponse = client().performRequest(getRequest); + assertThat(getResponse.getStatusLine().getStatusCode(), is(200)); + assertThat(EntityUtils.toString(getResponse.getEntity()), containsString(updatedJson)); + + // delete + Request deleteRequest = new Request("DELETE", "/_logstash/pipeline/test_pipeline"); + Response deleteResponse = client().performRequest(deleteRequest); + assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200)); + + // list is now empty + Request listAll = new Request("GET", "/_logstash/pipeline"); + Response listAllResponse = client().performRequest(listAll); + assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200)); + assertThat(EntityUtils.toString(listAllResponse.getEntity()), is("{}")); + } + + public void testGetNonExistingPipeline() { + Request getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline"); + ResponseException re = expectThrows(ResponseException.class, () -> client().performRequest(getRequest)); + Response getResponse = re.getResponse(); + assertThat(getResponse.getStatusLine().getStatusCode(), is(404)); + } + + public void testDeleteNonExistingPipeline() { + Request deleteRequest = new Request("DELETE", "/_logstash/pipeline/test_pipeline"); + ResponseException re = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest)); + Response getResponse = re.getResponse(); + assertThat(getResponse.getStatusLine().getStatusCode(), is(404)); + } + + public void testMultiplePipelines() throws IOException { + final int numPipelines = scaledRandomIntBetween(2, 2000); + final List ids = new ArrayList<>(numPipelines); + final String pipelineJson = getPipelineJson(); + for (int i = 0; i < numPipelines; i++) { + final String id = "id" + i; + ids.add(id); + createPipeline(id, pipelineJson); + } + + // test mget-like + final int numToGet = scaledRandomIntBetween(2, Math.min(100, numPipelines)); // limit number to avoid HTTP line length issues + final List mgetIds = randomSubsetOf(numToGet, ids); + final String path = "/_logstash/pipeline/" + Strings.collectionToCommaDelimitedString(mgetIds); + Request getRequest = new Request("GET", path); + Response getResponse = client().performRequest(getRequest); + assertThat(getResponse.getStatusLine().getStatusCode(), is(200)); + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(getResponse.getEntity()), + false + ); + + for (String id : mgetIds) { + assertTrue(responseMap.containsKey(id)); + } + + refreshAllIndices(); + + // list without any IDs + Request listAll = new Request("GET", "/_logstash/pipeline"); + Response listAllResponse = client().performRequest(listAll); + assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200)); + Map listResponseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(listAllResponse.getEntity()), + false + ); + for (String id : ids) { + assertTrue(listResponseMap.containsKey(id)); + } + assertThat(listResponseMap.size(), is(ids.size())); + } + + private void createPipeline(String id, String json) throws IOException { + Request putRequest = new Request("PUT", "/_logstash/pipeline/" + id); + putRequest.setJsonEntity(json); + Response putResponse = client().performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(201)); + } + + private String getPipelineJson() throws IOException { + return getPipelineJson("2020-03-09T15:42:30.229Z"); + } + + private String getPipelineJson(String date) throws IOException { + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.field("description", "test pipeline"); + builder.field("last_modified", date); + builder.startObject("pipeline_metadata"); + { + builder.field("version", 1); + builder.field("type", "logstash_pipeline"); + } + builder.endObject(); + builder.field("username", "john.doe"); + builder.field("pipeline", "\"input\": {},\n \"filter\": {},\n \"output\": {}\n"); + builder.startObject("pipeline_settings"); + { + builder.field("pipeline.batch.delay", 50); + builder.field("pipeline.batch.size", 125); + builder.field("pipeline.workers", 1); + builder.field("queue.checkpoint.writes", 1024); + builder.field("queue.max_bytes", "1gb"); + builder.field("queue.type", "memory"); + } + builder.endObject(); + } + builder.endObject(); + return BytesReference.bytes(builder).utf8ToString(); + } + } +} diff --git a/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index d2474cd9bf8..659ac47b0ed 100644 --- a/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/yamlRestTest/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -56,7 +56,7 @@ import static org.hamcrest.Matchers.hasSize; // TODO: Remove this timeout increase once this test suite is broken up @TimeoutSuite(millis = 60 * TimeUnits.MINUTE) public class XPackRestIT extends ESClientYamlSuiteTestCase { - private static final String BASIC_AUTH_VALUE = + static final String BASIC_AUTH_VALUE = basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); public XPackRestIT(ClientYamlTestCandidate testCandidate) {