Add logstash system index APIs (#53350) (#62347)

We want Logstash indices to be system indices, but the logstash
service will still need to be able to manage its indices. This PR
adds special system index APIs to the logstash plugin so that
logstash can manage its pipelines without direct access to the
underlying indices.

* Add logstash module with dedicated logstash APIs
* merge with x-pack plugin
* add system index access allowance
* Break out serialization tests into distinct classes
* Log failures for partial multiget failure
* Move LogstashSystemIndexIT to javaRestTest task

Co-authored-by: William Brafford <william.brafford@elastic.co>

Co-authored-by: Jay Modi <jaymode@users.noreply.github.com>
This commit is contained in:
William Brafford 2020-09-15 12:42:14 -04:00 committed by GitHub
parent ffbc64bd10
commit af64e46065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1508 additions and 2 deletions

View File

@ -7,21 +7,40 @@ package org.elasticsearch.xpack.logstash;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.logstash.action.DeletePipelineAction;
import org.elasticsearch.xpack.logstash.action.GetPipelineAction;
import org.elasticsearch.xpack.logstash.action.PutPipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportDeletePipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportPutPipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestDeletePipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestGetPipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestPutPipelineAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
/**
@ -29,7 +48,7 @@ import java.util.function.UnaryOperator;
*/
public class Logstash extends Plugin implements SystemIndexPlugin {
private static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
public static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
private static final String LOGSTASH_TEMPLATE_FILE_NAME = "logstash-management";
private static final String LOGSTASH_INDEX_TEMPLATE_NAME = ".logstash-management";
private static final String OLD_LOGSTASH_INDEX_NAME = "logstash-index-template";
@ -43,6 +62,32 @@ public class Logstash extends Plugin implements SystemIndexPlugin {
return modules;
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return org.elasticsearch.common.collect.List.of(
new ActionHandler<>(PutPipelineAction.INSTANCE, TransportPutPipelineAction.class),
new ActionHandler<>(GetPipelineAction.INSTANCE, TransportGetPipelineAction.class),
new ActionHandler<>(DeletePipelineAction.INSTANCE, TransportDeletePipelineAction.class)
);
}
@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return org.elasticsearch.common.collect.List.of(
new RestPutPipelineAction(),
new RestGetPipelineAction(),
new RestDeletePipelineAction()
);
}
public UnaryOperator<Map<String, IndexTemplateMetadata>> getIndexTemplateMetadataUpgrader() {
return templates -> {
templates.keySet().removeIf(OLD_LOGSTASH_INDEX_NAME::equals);

View File

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class Pipeline {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Pipeline, String> PARSER = new ConstructingObjectParser<>(
"pipeline",
true,
(objects, id) -> {
Iterator<Object> iterator = Arrays.asList(objects).iterator();
return new Pipeline(
id,
(Instant) iterator.next(),
(Map<String, Object>) iterator.next(),
(String) iterator.next(),
(String) iterator.next(),
(Map<String, Object>) iterator.next()
);
}
);
public static final ParseField LAST_MODIFIED = new ParseField("last_modified");
public static final ParseField PIPELINE_METADATA = new ParseField("pipeline_metadata");
public static final ParseField USERNAME = new ParseField("username");
public static final ParseField PIPELINE = new ParseField("pipeline");
public static final ParseField PIPELINE_SETTINGS = new ParseField("pipeline_settings");
static {
PARSER.declareField(constructorArg(), (parser, s) -> {
final String instantISOString = parser.text();
return Instant.parse(instantISOString);
}, LAST_MODIFIED, ValueType.STRING);
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_METADATA);
PARSER.declareString(constructorArg(), USERNAME);
PARSER.declareString(constructorArg(), PIPELINE);
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_SETTINGS);
}
private final String id;
private final Instant lastModified;
private final Map<String, Object> pipelineMetadata;
private final String username;
private final String pipeline;
private final Map<String, Object> pipelineSettings;
public Pipeline(
String id,
Instant lastModified,
Map<String, Object> pipelineMetadata,
String username,
String pipeline,
Map<String, Object> pipelineSettings
) {
this.id = id;
this.lastModified = lastModified;
this.pipelineMetadata = pipelineMetadata;
this.username = username;
this.pipeline = pipeline;
this.pipelineSettings = pipelineSettings;
}
public String getId() {
return id;
}
public Instant getLastModified() {
return lastModified;
}
public Map<String, Object> getPipelineMetadata() {
return pipelineMetadata;
}
public String getUsername() {
return username;
}
public String getPipeline() {
return pipeline;
}
public Map<String, Object> getPipelineSettings() {
return pipelineSettings;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionType;
public class DeletePipelineAction extends ActionType<DeletePipelineResponse> {
public static final String NAME = "cluster:admin/logstash/pipeline/delete";
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();
private DeletePipelineAction() {
super(NAME, DeletePipelineResponse::new);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
public class DeletePipelineRequest extends ActionRequest {
private final String id;
public DeletePipelineRequest(String id) {
this.id = Objects.requireNonNull(id);
}
public DeletePipelineRequest(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
}
public String id() {
return id;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeletePipelineRequest that = (DeletePipelineRequest) o;
return Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
public class DeletePipelineResponse extends ActionResponse {
private final boolean deleted;
public DeletePipelineResponse(boolean deleted) {
this.deleted = deleted;
}
public DeletePipelineResponse(StreamInput in) throws IOException {
super(in);
this.deleted = in.readBoolean();
}
public boolean isDeleted() {
return deleted;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(deleted);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeletePipelineResponse that = (DeletePipelineResponse) o;
return deleted == that.deleted;
}
@Override
public int hashCode() {
return Objects.hash(deleted);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionType;
public class GetPipelineAction extends ActionType<GetPipelineResponse> {
public static final String NAME = "cluster:admin/logstash/pipeline/get";
public static final GetPipelineAction INSTANCE = new GetPipelineAction();
private GetPipelineAction() {
super(NAME, GetPipelineResponse::new);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class GetPipelineRequest extends ActionRequest {
private final List<String> ids;
public GetPipelineRequest(List<String> ids) {
this.ids = Objects.requireNonNull(ids);
}
public GetPipelineRequest(StreamInput in) throws IOException {
super(in);
ids = in.readStringList();
}
public List<String> ids() {
return ids;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(ids);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetPipelineRequest that = (GetPipelineRequest) o;
return Objects.equals(ids, that.ids);
}
@Override
public int hashCode() {
return Objects.hash(ids);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
public class GetPipelineResponse extends ActionResponse implements ToXContentObject {
private final Map<String, BytesReference> pipelines;
public GetPipelineResponse(Map<String, BytesReference> pipelines) {
this.pipelines = pipelines;
}
public GetPipelineResponse(StreamInput in) throws IOException {
super(in);
this.pipelines = in.readMap(StreamInput::readString, StreamInput::readBytesReference);
}
public Map<String, BytesReference> pipelines() {
return pipelines;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(pipelines, StreamOutput::writeString, StreamOutput::writeBytesReference);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
for (Entry<String, BytesReference> entry : pipelines.entrySet()) {
builder.rawField(entry.getKey(), entry.getValue().streamInput());
}
return builder.endObject();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetPipelineResponse that = (GetPipelineResponse) o;
return Objects.equals(pipelines, that.pipelines);
}
@Override
public int hashCode() {
return Objects.hash(pipelines);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionType;
public class PutPipelineAction extends ActionType<PutPipelineResponse> {
public static final String NAME = "cluster:admin/logstash/pipeline/put";
public static final PutPipelineAction INSTANCE = new PutPipelineAction();
private PutPipelineAction() {
super(NAME, PutPipelineResponse::new);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Objects;
public class PutPipelineRequest extends ActionRequest {
private final String id;
private final String source;
private final XContentType xContentType;
public PutPipelineRequest(String id, String source, XContentType xContentType) {
this.id = id;
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
}
public PutPipelineRequest(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
this.source = in.readString();
this.xContentType = in.readEnum(XContentType.class);
}
public String id() {
return id;
}
public String source() {
return source;
}
public XContentType xContentType() {
return xContentType;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeString(source);
out.writeEnum(xContentType);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PutPipelineRequest that = (PutPipelineRequest) o;
return Objects.equals(id, that.id) && Objects.equals(source, that.source) && xContentType == that.xContentType;
}
@Override
public int hashCode() {
return Objects.hash(id, source, xContentType);
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Objects;
public class PutPipelineResponse extends ActionResponse {
private final RestStatus status;
public PutPipelineResponse(RestStatus status) {
this.status = Objects.requireNonNull(status);
}
public PutPipelineResponse(StreamInput in) throws IOException {
super(in);
this.status = in.readEnum(RestStatus.class);
}
public RestStatus status() {
return status;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(status);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PutPipelineResponse that = (PutPipelineResponse) o;
return status == that.status;
}
@Override
public int hashCode() {
return Objects.hash(status);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse.Result;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.logstash.Logstash;
public class TransportDeletePipelineAction extends HandledTransportAction<DeletePipelineRequest, DeletePipelineResponse> {
private final Client client;
@Inject
public TransportDeletePipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(DeletePipelineAction.NAME, transportService, actionFilters, DeletePipelineRequest::new);
this.client = client;
}
@Override
protected void doExecute(Task task, DeletePipelineRequest request, ActionListener<DeletePipelineResponse> listener) {
client.prepareDelete()
.setIndex(Logstash.LOGSTASH_CONCRETE_INDEX_NAME)
.setId(request.id())
.execute(
ActionListener.wrap(
deleteResponse -> listener.onResponse(new DeletePipelineResponse(deleteResponse.getResult() == Result.DELETED)),
listener::onFailure
)
);
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.logstash.Logstash;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class TransportGetPipelineAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> {
private static final Logger logger = LogManager.getLogger(TransportGetPipelineAction.class);
private final Client client;
@Inject
public TransportGetPipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(GetPipelineAction.NAME, transportService, actionFilters, GetPipelineRequest::new);
this.client = client;
}
@Override
protected void doExecute(Task task, GetPipelineRequest request, ActionListener<GetPipelineResponse> listener) {
if (request.ids().isEmpty()) {
client.prepareSearch(Logstash.LOGSTASH_CONCRETE_INDEX_NAME)
.setSource(
SearchSourceBuilder.searchSource()
.fetchSource(true)
.query(QueryBuilders.matchAllQuery())
.size(1000)
.trackTotalHits(true)
)
.setScroll(TimeValue.timeValueMinutes(1L))
.execute(ActionListener.wrap(searchResponse -> {
final int numHits = Math.toIntExact(searchResponse.getHits().getTotalHits().value);
final Map<String, BytesReference> pipelineSources = new HashMap<>(numHits);
final Consumer<SearchResponse> clearScroll = (response) -> {
if (response != null && response.getScrollId() != null) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(
clearScrollRequest,
ActionListener.wrap(
(r) -> {},
e -> logger.warn(
new ParameterizedMessage("clear scroll failed for scroll id [{}]", response.getScrollId()),
e
)
)
);
}
};
handleSearchResponse(searchResponse, pipelineSources, clearScroll, listener);
}, listener::onFailure));
} else if (request.ids().size() == 1) {
client.prepareGet(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, "_doc", request.ids().get(0))
.setFetchSource(true)
.execute(ActionListener.wrap(response -> {
if (response.isExists()) {
listener.onResponse(
new GetPipelineResponse(
org.elasticsearch.common.collect.Map.of(response.getId(), response.getSourceAsBytesRef())
)
);
} else {
listener.onResponse(new GetPipelineResponse(org.elasticsearch.common.collect.Map.of()));
}
}, listener::onFailure));
} else {
MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
for (String id : request.ids()) {
requestBuilder.add(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, "_doc", id);
}
requestBuilder.execute(ActionListener.wrap(mGetResponse -> {
logFailures(mGetResponse);
listener.onResponse(
new GetPipelineResponse(
Arrays.stream(mGetResponse.getResponses())
.filter(itemResponse -> itemResponse.isFailed() == false)
.filter(itemResponse -> itemResponse.getResponse().isExists())
.map(MultiGetItemResponse::getResponse)
.collect(Collectors.toMap(GetResponse::getId, GetResponse::getSourceAsBytesRef))
)
);
}, listener::onFailure));
}
}
private void handleSearchResponse(
SearchResponse searchResponse,
Map<String, BytesReference> pipelineSources,
Consumer<SearchResponse> clearScroll,
ActionListener<GetPipelineResponse> listener
) {
for (SearchHit hit : searchResponse.getHits().getHits()) {
pipelineSources.put(hit.getId(), hit.getSourceRef());
}
if (pipelineSources.size() > searchResponse.getHits().getTotalHits().value) {
clearScroll.accept(searchResponse);
listener.onFailure(
new IllegalStateException(
"scrolling returned more hits ["
+ pipelineSources.size()
+ "] than expected ["
+ searchResponse.getHits().getTotalHits().value
+ "] so bailing out to prevent unbounded "
+ "memory consumption."
)
);
} else if (pipelineSources.size() == searchResponse.getHits().getTotalHits().value) {
clearScroll.accept(searchResponse);
listener.onResponse(new GetPipelineResponse(pipelineSources));
} else {
client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(1L))
.execute(
ActionListener.wrap(
searchResponse1 -> handleSearchResponse(searchResponse1, pipelineSources, clearScroll, listener),
listener::onFailure
)
);
}
}
private void logFailures(MultiGetResponse multiGetResponse) {
List<String> ids = Arrays.stream(multiGetResponse.getResponses())
.filter(MultiGetItemResponse::isFailed)
.filter(itemResponse -> itemResponse.getFailure() != null)
.map(itemResponse -> itemResponse.getFailure().getId())
.collect(Collectors.toList());
if (ids.isEmpty() == false) {
logger.info("Could not retrieve logstash pipelines with ids: {}", ids);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.logstash.Logstash;
public class TransportPutPipelineAction extends HandledTransportAction<PutPipelineRequest, PutPipelineResponse> {
private final Client client;
@Inject
public TransportPutPipelineAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(PutPipelineAction.NAME, transportService, actionFilters, PutPipelineRequest::new);
this.client = client;
}
@Override
protected void doExecute(Task task, PutPipelineRequest request, ActionListener<PutPipelineResponse> listener) {
client.prepareIndex()
.setIndex(Logstash.LOGSTASH_CONCRETE_INDEX_NAME)
.setId(request.id())
.setSource(request.source(), request.xContentType())
.execute(
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutPipelineResponse(indexResponse.status())),
listener::onFailure
)
);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.xpack.logstash.action.DeletePipelineAction;
import org.elasticsearch.xpack.logstash.action.DeletePipelineRequest;
import org.elasticsearch.xpack.logstash.action.DeletePipelineResponse;
import java.io.IOException;
import java.util.List;
public class RestDeletePipelineAction extends BaseRestHandler {
@Override
public String getName() {
return "logstash_delete_pipeline";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(new Route(Method.DELETE, "/_logstash/pipeline/{id}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
final String id = request.param("id");
return restChannel -> client.execute(
DeletePipelineAction.INSTANCE,
new DeletePipelineRequest(id),
new RestActionListener<DeletePipelineResponse>(restChannel) {
@Override
protected void processResponse(DeletePipelineResponse deletePipelineResponse) {
final RestStatus status = deletePipelineResponse.isDeleted() ? RestStatus.OK : RestStatus.NOT_FOUND;
channel.sendResponse(new BytesRestResponse(status, XContentType.JSON.mediaType(), BytesArray.EMPTY));
}
}
);
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.logstash.action.GetPipelineAction;
import org.elasticsearch.xpack.logstash.action.GetPipelineRequest;
import org.elasticsearch.xpack.logstash.action.GetPipelineResponse;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestGetPipelineAction extends BaseRestHandler {
@Override
public String getName() {
return "logstash_get_pipeline";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(
new Route(Method.GET, "/_logstash/pipeline"),
new Route(Method.GET, "/_logstash/pipeline/{id}")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
final List<String> ids = Arrays.asList(request.paramAsStringArray("id", Strings.EMPTY_ARRAY));
return restChannel -> client.execute(
GetPipelineAction.INSTANCE,
new GetPipelineRequest(ids),
new RestToXContentListener<GetPipelineResponse>(restChannel) {
@Override
protected RestStatus getStatus(GetPipelineResponse response) {
if (response.pipelines().isEmpty() && ids.isEmpty() == false) {
return RestStatus.NOT_FOUND;
}
return RestStatus.OK;
}
}
);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.xpack.logstash.Pipeline;
import org.elasticsearch.xpack.logstash.action.PutPipelineAction;
import org.elasticsearch.xpack.logstash.action.PutPipelineRequest;
import org.elasticsearch.xpack.logstash.action.PutPipelineResponse;
import java.io.IOException;
import java.util.List;
public class RestPutPipelineAction extends BaseRestHandler {
@Override
public String getName() {
return "logstash_put_pipeline";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(new Route(Method.PUT, "/_logstash/pipeline/{id}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
final String id = request.param("id");
try (XContentParser parser = request.contentParser()) {
// parse pipeline for validation
Pipeline.PARSER.apply(parser, id);
}
return restChannel -> {
final String content = request.content().utf8ToString();
client.execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(id, content, request.getXContentType()),
new RestActionListener<PutPipelineResponse>(restChannel) {
@Override
protected void processResponse(PutPipelineResponse putPipelineResponse) throws Exception {
channel.sendResponse(
new BytesRestResponse(putPipelineResponse.status(), XContentType.JSON.mediaType(), BytesArray.EMPTY)
);
}
}
);
};
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.test.ESTestCase;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
public class LogstashPluginTests extends ESTestCase {
public void testSystemIndices() {
assertThat(
new Logstash().getSystemIndexDescriptors(Settings.EMPTY)
.stream()
.map(SystemIndexDescriptor::getIndexPattern)
.collect(Collectors.toList()),
contains(".logstash")
);
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class DeletePipelineRequestTests extends AbstractWireSerializingTestCase<DeletePipelineRequest> {
@Override
protected Writeable.Reader<DeletePipelineRequest> instanceReader() {
return DeletePipelineRequest::new;
}
@Override
protected DeletePipelineRequest createTestInstance() {
return new DeletePipelineRequest(randomAlphaOfLengthBetween(2, 10));
}
@Override
protected DeletePipelineRequest mutateInstance(DeletePipelineRequest instance) {
return new DeletePipelineRequest(instance.id() + randomAlphaOfLength(1));
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class DeletePipelineResponseTests extends AbstractWireSerializingTestCase<DeletePipelineResponse> {
@Override
protected Writeable.Reader<DeletePipelineResponse> instanceReader() {
return DeletePipelineResponse::new;
}
@Override
protected DeletePipelineResponse createTestInstance() {
return new DeletePipelineResponse(randomBoolean());
}
@Override
protected DeletePipelineResponse mutateInstance(DeletePipelineResponse instance) {
// return a response with the opposite boolean value
return new DeletePipelineResponse(instance.isDeleted() == false);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class GetPipelineRequestTests extends AbstractWireSerializingTestCase<GetPipelineRequest> {
@Override
protected Writeable.Reader<GetPipelineRequest> instanceReader() {
return GetPipelineRequest::new;
}
@Override
protected GetPipelineRequest createTestInstance() {
return new GetPipelineRequest(randomList(0, 50, () -> randomAlphaOfLengthBetween(2, 10)));
}
@Override
protected GetPipelineRequest mutateInstance(GetPipelineRequest instance) {
List<String> ids = new ArrayList<>();
ids.addAll(instance.ids());
if (randomBoolean() || ids.size() == 0) {
// append another ID
ids.add(randomAlphaOfLengthBetween(2, 10));
} else {
// change ID strings
ids = ids.stream().map(id -> id + randomAlphaOfLength(1)).collect(Collectors.toList());
}
return new GetPipelineRequest(ids);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.HashMap;
import java.util.Map;
public class GetPipelineResponseTests extends AbstractWireSerializingTestCase<GetPipelineResponse> {
@Override
protected Writeable.Reader<GetPipelineResponse> instanceReader() {
return GetPipelineResponse::new;
}
@Override
protected GetPipelineResponse createTestInstance() {
final int numPipelines = randomIntBetween(1, 10);
final Map<String, BytesReference> map = new HashMap<>(numPipelines);
for (int i = 0; i < numPipelines; i++) {
final String name = randomAlphaOfLengthBetween(2, 10);
final BytesReference ref = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 16)));
map.put(name, ref);
}
return new GetPipelineResponse(map);
}
@Override
protected GetPipelineResponse mutateInstance(GetPipelineResponse instance) {
Map<String, BytesReference> map = new HashMap<>(instance.pipelines().size() + 1);
map.putAll(instance.pipelines());
map.put(randomAlphaOfLengthBetween(2, 10), new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 16))));
return new GetPipelineResponse(map);
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class PutPipelineRequestTests extends AbstractWireSerializingTestCase<PutPipelineRequest> {
@Override
protected Writeable.Reader<PutPipelineRequest> instanceReader() {
return PutPipelineRequest::new;
}
@Override
protected PutPipelineRequest createTestInstance() {
return new PutPipelineRequest(randomAlphaOfLength(2), randomAlphaOfLengthBetween(10, 100), randomFrom(XContentType.values()));
}
@Override
protected PutPipelineRequest mutateInstance(PutPipelineRequest instance) {
return new PutPipelineRequest(
instance.id() + randomAlphaOfLength(1),
instance.source() + randomAlphaOfLength(1),
instance.xContentType()
);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class PutPipelineResponseTests extends AbstractWireSerializingTestCase<PutPipelineResponse> {
@Override
protected Writeable.Reader<PutPipelineResponse> instanceReader() {
return PutPipelineResponse::new;
}
@Override
protected PutPipelineResponse createTestInstance() {
return new PutPipelineResponse(randomFrom(RestStatus.OK, RestStatus.CREATED));
}
@Override
protected PutPipelineResponse mutateInstance(PutPipelineResponse instance) {
if (instance.status() == RestStatus.OK) {
return new PutPipelineResponse(RestStatus.CREATED);
}
return new PutPipelineResponse(RestStatus.OK);
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.logstash.action;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.transport.TransportService;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportGetPipelineActionTests extends ESTestCase {
/**
* Test that an error message is logged on a partial failure of
* a TransportGetPipelineAction.
*/
public void testGetPipelineMultipleIDsPartialFailure() throws Exception {
// Set up a log appender for detecting log messages
final MockLogAppender mockLogAppender = new MockLogAppender();
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
"org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction",
Level.INFO,
"Could not retrieve logstash pipelines with ids: [2]"
)
);
mockLogAppender.start();
final Logger logger = LogManager.getLogger(TransportGetPipelineAction.class);
// Set up a MultiGetResponse
GetResponse mockResponse = mock(GetResponse.class);
when(mockResponse.getId()).thenReturn("1");
when(mockResponse.getSourceAsBytesRef()).thenReturn(mock(BytesReference.class));
when(mockResponse.isExists()).thenReturn(true);
MultiGetResponse.Failure failure = mock(MultiGetResponse.Failure.class);
when(failure.getId()).thenReturn("2");
MultiGetResponse multiGetResponse = new MultiGetResponse(
new MultiGetItemResponse[] { new MultiGetItemResponse(mockResponse, null), new MultiGetItemResponse(null, failure) }
);
GetPipelineRequest request = new GetPipelineRequest(org.elasticsearch.common.collect.List.of("1", "2"));
// Set up an ActionListener for the actual test conditions
ActionListener<GetPipelineResponse> testActionListener = new ActionListener<GetPipelineResponse>() {
@Override
public void onResponse(GetPipelineResponse getPipelineResponse) {
// check successful pipeline get
assertThat(getPipelineResponse, is(notNullValue()));
assertThat(getPipelineResponse.pipelines().size(), equalTo(1));
// check that failed pipeline get is logged
mockLogAppender.assertAllExpectationsMatched();
}
@Override
public void onFailure(Exception e) {
// do nothing
}
};
try (Client client = getMockClient(multiGetResponse)) {
Loggers.addAppender(logger, mockLogAppender);
TransportGetPipelineAction action = new TransportGetPipelineAction(
mock(TransportService.class),
mock(ActionFilters.class),
client
);
action.doExecute(null, request, testActionListener);
} finally {
Loggers.removeAppender(logger, mockLogAppender);
mockLogAppender.stop();
}
}
private Client getMockClient(ActionResponse response) {
return new NoOpClient(getTestName()) {
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
listener.onResponse((Response) response);
}
};
}
}

View File

@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.test.rest;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
public class LogstashSystemIndexIT extends ESRestTestCase {
static final String BASIC_AUTH_VALUE =
basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
@Override
protected Settings restClientSettings() {
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE)
.build();
}
public void testTemplateIsPut() throws Exception {
assertBusy(
() -> assertThat(
client().performRequest(new Request("HEAD", "/_template/.logstash-management")).getStatusLine().getStatusCode(),
is(200)
)
);
}
public void testPipelineCRUD() throws Exception {
// put pipeline
final String pipelineJson = getPipelineJson();
createPipeline("test_pipeline", pipelineJson);
// get pipeline
Request getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline");
Response getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
assertThat(EntityUtils.toString(getResponse.getEntity()), containsString(pipelineJson));
// update
final String updatedJson = getPipelineJson("2020-03-09T15:42:35.229Z");
Request putRequest = new Request("PUT", "/_logstash/pipeline/test_pipeline");
putRequest.setJsonEntity(updatedJson);
Response putResponse = client().performRequest(putRequest);
assertThat(putResponse.getStatusLine().getStatusCode(), is(200));
getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline");
getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
assertThat(EntityUtils.toString(getResponse.getEntity()), containsString(updatedJson));
// delete
Request deleteRequest = new Request("DELETE", "/_logstash/pipeline/test_pipeline");
Response deleteResponse = client().performRequest(deleteRequest);
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
// list is now empty
Request listAll = new Request("GET", "/_logstash/pipeline");
Response listAllResponse = client().performRequest(listAll);
assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200));
assertThat(EntityUtils.toString(listAllResponse.getEntity()), is("{}"));
}
public void testGetNonExistingPipeline() {
Request getRequest = new Request("GET", "/_logstash/pipeline/test_pipeline");
ResponseException re = expectThrows(ResponseException.class, () -> client().performRequest(getRequest));
Response getResponse = re.getResponse();
assertThat(getResponse.getStatusLine().getStatusCode(), is(404));
}
public void testDeleteNonExistingPipeline() {
Request deleteRequest = new Request("DELETE", "/_logstash/pipeline/test_pipeline");
ResponseException re = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest));
Response getResponse = re.getResponse();
assertThat(getResponse.getStatusLine().getStatusCode(), is(404));
}
public void testMultiplePipelines() throws IOException {
final int numPipelines = scaledRandomIntBetween(2, 2000);
final List<String> ids = new ArrayList<>(numPipelines);
final String pipelineJson = getPipelineJson();
for (int i = 0; i < numPipelines; i++) {
final String id = "id" + i;
ids.add(id);
createPipeline(id, pipelineJson);
}
// test mget-like
final int numToGet = scaledRandomIntBetween(2, Math.min(100, numPipelines)); // limit number to avoid HTTP line length issues
final List<String> mgetIds = randomSubsetOf(numToGet, ids);
final String path = "/_logstash/pipeline/" + Strings.collectionToCommaDelimitedString(mgetIds);
Request getRequest = new Request("GET", path);
Response getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
Map<String, Object> responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(getResponse.getEntity()),
false
);
for (String id : mgetIds) {
assertTrue(responseMap.containsKey(id));
}
refreshAllIndices();
// list without any IDs
Request listAll = new Request("GET", "/_logstash/pipeline");
Response listAllResponse = client().performRequest(listAll);
assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200));
Map<String, Object> listResponseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(listAllResponse.getEntity()),
false
);
for (String id : ids) {
assertTrue(listResponseMap.containsKey(id));
}
assertThat(listResponseMap.size(), is(ids.size()));
}
private void createPipeline(String id, String json) throws IOException {
Request putRequest = new Request("PUT", "/_logstash/pipeline/" + id);
putRequest.setJsonEntity(json);
Response putResponse = client().performRequest(putRequest);
assertThat(putResponse.getStatusLine().getStatusCode(), is(201));
}
private String getPipelineJson() throws IOException {
return getPipelineJson("2020-03-09T15:42:30.229Z");
}
private String getPipelineJson(String date) throws IOException {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.field("description", "test pipeline");
builder.field("last_modified", date);
builder.startObject("pipeline_metadata");
{
builder.field("version", 1);
builder.field("type", "logstash_pipeline");
}
builder.endObject();
builder.field("username", "john.doe");
builder.field("pipeline", "\"input\": {},\n \"filter\": {},\n \"output\": {}\n");
builder.startObject("pipeline_settings");
{
builder.field("pipeline.batch.delay", 50);
builder.field("pipeline.batch.size", 125);
builder.field("pipeline.workers", 1);
builder.field("queue.checkpoint.writes", 1024);
builder.field("queue.max_bytes", "1gb");
builder.field("queue.type", "memory");
}
builder.endObject();
}
builder.endObject();
return BytesReference.bytes(builder).utf8ToString();
}
}
}

View File

@ -56,7 +56,7 @@ import static org.hamcrest.Matchers.hasSize;
// TODO: Remove this timeout increase once this test suite is broken up
@TimeoutSuite(millis = 60 * TimeUnits.MINUTE)
public class XPackRestIT extends ESClientYamlSuiteTestCase {
private static final String BASIC_AUTH_VALUE =
static final String BASIC_AUTH_VALUE =
basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
public XPackRestIT(ClientYamlTestCandidate testCandidate) {