mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Streamline the put and delete pipelines responses with the index and delete response in core.
This commit is contained in:
parent
6062c4eac9
commit
a2cda4e3f2
@ -22,16 +22,23 @@ package org.elasticsearch.action.delete;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
|
||||
|
||||
/**
|
||||
* The response of the delete action.
|
||||
*
|
||||
* @see org.elasticsearch.action.delete.DeleteRequest
|
||||
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
|
||||
*/
|
||||
public class DeleteResponse extends ActionWriteResponse {
|
||||
public class DeleteResponse extends ActionWriteResponse implements StatusToXContent {
|
||||
|
||||
private String index;
|
||||
private String id;
|
||||
@ -105,4 +112,33 @@ public class DeleteResponse extends ActionWriteResponse {
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(found);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
RestStatus status = getShardInfo().status();
|
||||
if (isFound() == false) {
|
||||
status = NOT_FOUND;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
ActionWriteResponse.ShardInfo shardInfo = getShardInfo();
|
||||
builder.field(Fields.FOUND, found)
|
||||
.field(Fields._INDEX, index)
|
||||
.field(Fields._TYPE, type)
|
||||
.field(Fields._ID, id)
|
||||
.field(Fields._VERSION, version)
|
||||
.value(shardInfo);
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("found");
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
}
|
||||
}
|
||||
|
@ -22,16 +22,23 @@ package org.elasticsearch.action.index;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.CREATED;
|
||||
|
||||
/**
|
||||
* A response of an index operation,
|
||||
*
|
||||
* @see org.elasticsearch.action.index.IndexRequest
|
||||
* @see org.elasticsearch.client.Client#index(IndexRequest)
|
||||
*/
|
||||
public class IndexResponse extends ActionWriteResponse {
|
||||
public class IndexResponse extends ActionWriteResponse implements StatusToXContent {
|
||||
|
||||
private String index;
|
||||
private String id;
|
||||
@ -106,6 +113,27 @@ public class IndexResponse extends ActionWriteResponse {
|
||||
out.writeBoolean(created);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
RestStatus status = getShardInfo().status();
|
||||
if (created) {
|
||||
status = CREATED;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
ActionWriteResponse.ShardInfo shardInfo = getShardInfo();
|
||||
builder.field(Fields._INDEX, index)
|
||||
.field(Fields._TYPE, type)
|
||||
.field(Fields._ID, id)
|
||||
.field(Fields._VERSION, version);
|
||||
shardInfo.toXContent(builder, params);
|
||||
builder.field(Fields.CREATED, created);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
@ -118,4 +146,12 @@ public class IndexResponse extends ActionWriteResponse {
|
||||
builder.append(",shards=").append(getShardInfo());
|
||||
return builder.append("]").toString();
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
static final XContentBuilderString CREATED = new XContentBuilderString("created");
|
||||
}
|
||||
}
|
||||
|
@ -19,22 +19,20 @@
|
||||
|
||||
package org.elasticsearch.rest.action.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
|
||||
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -62,31 +60,7 @@ public class RestDeleteAction extends BaseRestHandler {
|
||||
deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
|
||||
}
|
||||
|
||||
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
|
||||
ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo();
|
||||
builder.startObject().field(Fields.FOUND, result.isFound())
|
||||
.field(Fields._INDEX, result.getIndex())
|
||||
.field(Fields._TYPE, result.getType())
|
||||
.field(Fields._ID, result.getId())
|
||||
.field(Fields._VERSION, result.getVersion())
|
||||
.value(shardInfo)
|
||||
.endObject();
|
||||
RestStatus status = shardInfo.status();
|
||||
if (!result.isFound()) {
|
||||
status = NOT_FOUND;
|
||||
}
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
});
|
||||
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("found");
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@ -99,33 +100,7 @@ public class RestIndexAction extends BaseRestHandler {
|
||||
if (consistencyLevel != null) {
|
||||
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
|
||||
}
|
||||
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
|
||||
builder.startObject();
|
||||
ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
|
||||
builder.field(Fields._INDEX, response.getIndex())
|
||||
.field(Fields._TYPE, response.getType())
|
||||
.field(Fields._ID, response.getId())
|
||||
.field(Fields._VERSION, response.getVersion());
|
||||
shardInfo.toXContent(builder, request);
|
||||
builder.field(Fields.CREATED, response.isCreated());
|
||||
builder.endObject();
|
||||
RestStatus status = shardInfo.status();
|
||||
if (response.isCreated()) {
|
||||
status = CREATED;
|
||||
}
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
static final XContentBuilderString CREATED = new XContentBuilderString("created");
|
||||
client.index(indexRequest, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,18 +20,15 @@
|
||||
package org.elasticsearch.plugin.ingest.rest;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
|
||||
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestToXContentListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
public class RestDeletePipelineAction extends BaseRestHandler {
|
||||
|
||||
@ -45,6 +42,6 @@ public class RestDeletePipelineAction extends BaseRestHandler {
|
||||
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
|
||||
DeletePipelineRequest request = new DeletePipelineRequest();
|
||||
request.id(restRequest.param("id"));
|
||||
client.execute(DeletePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
client.execute(DeletePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestToXContentListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
public class RestPutPipelineAction extends BaseRestHandler {
|
||||
|
||||
@ -45,6 +45,6 @@ public class RestPutPipelineAction extends BaseRestHandler {
|
||||
if (restRequest.hasContent()) {
|
||||
request.source(restRequest.content());
|
||||
}
|
||||
client.execute(PutPipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
client.execute(PutPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,10 @@
|
||||
package org.elasticsearch.plugin.ingest.transport.delete;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeletePipelineAction extends Action<DeletePipelineRequest, DeletePipelineResponse, DeletePipelineRequestBuilder> {
|
||||
public class DeletePipelineAction extends Action<DeletePipelineRequest, DeleteResponse, DeletePipelineRequestBuilder> {
|
||||
|
||||
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();
|
||||
public static final String NAME = "cluster:admin/ingest/pipeline/delete";
|
||||
@ -37,7 +38,7 @@ public class DeletePipelineAction extends Action<DeletePipelineRequest, DeletePi
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeletePipelineResponse newResponse() {
|
||||
return new DeletePipelineResponse();
|
||||
public DeleteResponse newResponse() {
|
||||
return new DeleteResponse();
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,10 @@
|
||||
package org.elasticsearch.plugin.ingest.transport.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeletePipelineRequestBuilder extends ActionRequestBuilder<DeletePipelineRequest, DeletePipelineResponse, DeletePipelineRequestBuilder> {
|
||||
public class DeletePipelineRequestBuilder extends ActionRequestBuilder<DeletePipelineRequest, DeleteResponse, DeletePipelineRequestBuilder> {
|
||||
|
||||
public DeletePipelineRequestBuilder(ElasticsearchClient client, DeletePipelineAction action) {
|
||||
super(client, action, new DeletePipelineRequest());
|
||||
|
@ -1,80 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class DeletePipelineResponse extends ActionResponse implements ToXContent {
|
||||
|
||||
private String id;
|
||||
private boolean found;
|
||||
|
||||
DeletePipelineResponse() {
|
||||
}
|
||||
|
||||
public DeletePipelineResponse(String id, boolean found) {
|
||||
this.id = id;
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public boolean found() {
|
||||
return found;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
this.id = in.readString();
|
||||
this.found = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(id);
|
||||
out.writeBoolean(found);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.ID, id);
|
||||
builder.field(Fields.FOUND, found);
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("_found");
|
||||
}
|
||||
}
|
@ -20,9 +20,7 @@
|
||||
package org.elasticsearch.plugin.ingest.transport.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
@ -32,7 +30,7 @@ import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class DeletePipelineTransportAction extends HandledTransportAction<DeletePipelineRequest, DeletePipelineResponse> {
|
||||
public class DeletePipelineTransportAction extends HandledTransportAction<DeletePipelineRequest, DeleteResponse> {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@ -43,17 +41,7 @@ public class DeletePipelineTransportAction extends HandledTransportAction<Delete
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(DeletePipelineRequest request, ActionListener<DeletePipelineResponse> listener) {
|
||||
pipelineStore.delete(request, new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
listener.onResponse(new DeletePipelineResponse(deleteResponse.getId(), deleteResponse.isFound()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
protected void doExecute(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
|
||||
pipelineStore.delete(request, listener);
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,10 @@
|
||||
package org.elasticsearch.plugin.ingest.transport.put;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class PutPipelineAction extends Action<PutPipelineRequest, PutPipelineResponse, PutPipelineRequestBuilder> {
|
||||
public class PutPipelineAction extends Action<PutPipelineRequest, IndexResponse, PutPipelineRequestBuilder> {
|
||||
|
||||
public static final PutPipelineAction INSTANCE = new PutPipelineAction();
|
||||
public static final String NAME = "cluster:admin/ingest/pipeline/put";
|
||||
@ -37,7 +38,7 @@ public class PutPipelineAction extends Action<PutPipelineRequest, PutPipelineRes
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutPipelineResponse newResponse() {
|
||||
return new PutPipelineResponse();
|
||||
public IndexResponse newResponse() {
|
||||
return new IndexResponse();
|
||||
}
|
||||
}
|
||||
|
@ -20,10 +20,11 @@
|
||||
package org.elasticsearch.plugin.ingest.transport.put;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
||||
public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineRequest, PutPipelineResponse, PutPipelineRequestBuilder> {
|
||||
public class PutPipelineRequestBuilder extends ActionRequestBuilder<PutPipelineRequest, IndexResponse, PutPipelineRequestBuilder> {
|
||||
|
||||
public PutPipelineRequestBuilder(ElasticsearchClient client, PutPipelineAction action) {
|
||||
super(client, action, new PutPipelineRequest());
|
||||
|
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport.put;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class PutPipelineResponse extends ActionResponse implements ToXContent {
|
||||
|
||||
private String id;
|
||||
private long version;
|
||||
|
||||
public String id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public PutPipelineResponse id(String id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public PutPipelineResponse version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
id = in.readString();
|
||||
version = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.ID, id);
|
||||
builder.field(Fields.VERSION, version);
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString VERSION = new XContentBuilderString("_version");
|
||||
}
|
||||
}
|
@ -36,7 +36,7 @@ import org.elasticsearch.transport.TransportService;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, PutPipelineResponse> {
|
||||
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, IndexResponse> {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
|
||||
@ -47,20 +47,7 @@ public class PutPipelineTransportAction extends HandledTransportAction<PutPipeli
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener) {
|
||||
pipelineStore.put(request, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse indexResponse) {
|
||||
PutPipelineResponse response = new PutPipelineResponse();
|
||||
response.id(indexResponse.getId());
|
||||
response.version(indexResponse.getVersion());
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
protected void doExecute(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
|
||||
pipelineStore.put(request, listener);
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
@ -31,7 +32,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequestBuilder;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineResponse;
|
||||
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
|
||||
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
|
||||
@ -199,16 +199,13 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||
.endArray()
|
||||
.endObject().bytes())
|
||||
.get();
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
|
||||
.setIds("_id")
|
||||
.get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
assertThat(response.pipelines().size(), equalTo(1));
|
||||
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
|
||||
.setIds("_id")
|
||||
.get();
|
||||
assertThat(response.isFound(), is(true));
|
||||
assertThat(response.pipelines().size(), equalTo(1));
|
||||
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
|
||||
});
|
||||
|
||||
createIndex("test");
|
||||
@ -224,45 +221,36 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
|
||||
.get();
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(doc.get("val"), equalTo(123.42));
|
||||
assertThat(doc.get("status"), equalTo(400));
|
||||
assertThat(doc.get("msg"), equalTo("foo"));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(doc.get("val"), equalTo(123.42));
|
||||
assertThat(doc.get("status"), equalTo(400));
|
||||
assertThat(doc.get("msg"), equalTo("foo"));
|
||||
});
|
||||
|
||||
client().prepareBulk().add(
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
|
||||
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||
assertThat(doc.get("val"), equalTo(123.42));
|
||||
assertThat(doc.get("status"), equalTo(400));
|
||||
assertThat(doc.get("msg"), equalTo("foo"));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||
assertThat(doc.get("val"), equalTo(123.42));
|
||||
assertThat(doc.get("status"), equalTo(400));
|
||||
assertThat(doc.get("msg"), equalTo("foo"));
|
||||
});
|
||||
|
||||
DeletePipelineResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
|
||||
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
|
||||
.setId("_id")
|
||||
.get();
|
||||
assertThat(response.found(), is(true));
|
||||
assertThat(response.id(), equalTo("_id"));
|
||||
assertThat(response.isFound(), is(true));
|
||||
assertThat(response.getId(), equalTo("_id"));
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
|
||||
.setIds("_id")
|
||||
.get();
|
||||
assertThat(response.isFound(), is(false));
|
||||
assertThat(response.pipelines().size(), equalTo(0));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
GetPipelineResponse response1 = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
|
||||
.setIds("_id")
|
||||
.get();
|
||||
assertThat(response1.isFound(), is(false));
|
||||
assertThat(response1.pipelines().size(), equalTo(0));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,9 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 1 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
@ -39,8 +42,11 @@
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 2 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { _found: true }
|
||||
- match: { found: true }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
|
Loading…
x
Reference in New Issue
Block a user