Polishing

This commit is contained in:
Peter-Josef Meisch 2021-03-16 22:36:57 +01:00
parent 005d6a4d6f
commit 98a8d1a5ac
No known key found for this signature in database
GPG Key ID: DE108246970C7708
5 changed files with 50 additions and 56 deletions

View File

@ -22,7 +22,6 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler;
import org.elasticsearch.action.get.MultiGetItemResponse;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
@ -64,6 +63,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -409,8 +409,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
@Override @Override
public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest) { public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest) {
return sendRequest(searchTemplateRequest, requestCreator.searchTemplate(), SearchTemplateResponse.class, headers) return sendRequest(searchTemplateRequest, requestCreator.searchTemplate(), SearchTemplateResponse.class, headers)
.map(r -> r.getResponse().getHits()) .map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
.flatMap(Flux::fromIterable);
} }
/* /*
@ -876,10 +875,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
return response.body(BodyExtractors.toMono(byte[].class)) // return response.body(BodyExtractors.toMono(byte[].class)) //
.switchIfEmpty(Mono .switchIfEmpty(Mono.error(
.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.", new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
request.getMethod(), request.getEndpoint(), statusCode), status)) request.getMethod(), request.getEndpoint(), statusCode), status)))
)
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
.flatMap(content -> contentOrError(content, mediaType, status)) .flatMap(content -> contentOrError(content, mediaType, status))
.flatMap(unused -> Mono .flatMap(unused -> Mono

View File

@ -391,7 +391,7 @@ public interface ReactiveElasticsearchClient {
* *
* @param consumer must not be {@literal null}. * @param consumer must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template
* API on elastic.co</a> * API on elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one. * @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/ */
default Flux<SearchHit> searchTemplate(Consumer<SearchTemplateRequest> consumer) { default Flux<SearchHit> searchTemplate(Consumer<SearchTemplateRequest> consumer) {
@ -405,7 +405,7 @@ public interface ReactiveElasticsearchClient {
* *
* @param searchTemplateRequest must not be {@literal null}. * @param searchTemplateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template
* API on elastic.co</a> * API on elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one. * @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/ */
default Flux<SearchHit> searchTemplate(SearchTemplateRequest searchTemplateRequest) { default Flux<SearchHit> searchTemplate(SearchTemplateRequest searchTemplateRequest) {
@ -418,7 +418,7 @@ public interface ReactiveElasticsearchClient {
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param searchTemplateRequest must not be {@literal null}. * @param searchTemplateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template
* API on elastic.co</a> * API on elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one. * @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/ */
Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest); Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest);

View File

@ -416,7 +416,7 @@ public class RequestConverters {
SearchRequest searchRequest = templateRequest.getRequest(); SearchRequest searchRequest = templateRequest.getRequest();
String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(templateRequest.getRequest().indices()) String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(templateRequest.getRequest().indices())
.addPathPart("_search").addPathPart("template").build(); .addPathPart("_search").addPathPart("template").build();
Request request = new Request(HttpMethod.GET.name(), endpoint); Request request = new Request(HttpMethod.GET.name(), endpoint);
Params params = new Params(request); Params params = new Params(request);
@ -751,21 +751,21 @@ public class RequestConverters {
return request; return request;
} }
public static Request indexExists(org.elasticsearch.client.indices.GetIndexRequest getIndexRequest) { public static Request indexExists(org.elasticsearch.client.indices.GetIndexRequest getIndexRequest) {
// this can be called with no indices as argument by transport client, not via REST though // this can be called with no indices as argument by transport client, not via REST though
if (getIndexRequest.indices() == null || getIndexRequest.indices().length == 0) { if (getIndexRequest.indices() == null || getIndexRequest.indices().length == 0) {
throw new IllegalArgumentException("indices are mandatory"); throw new IllegalArgumentException("indices are mandatory");
} }
String endpoint = endpoint(getIndexRequest.indices(), ""); String endpoint = endpoint(getIndexRequest.indices(), "");
Request request = new Request(HttpMethod.HEAD.name(), endpoint); Request request = new Request(HttpMethod.HEAD.name(), endpoint);
Params params = new Params(request); Params params = new Params(request);
params.withLocal(getIndexRequest.local()); params.withLocal(getIndexRequest.local());
params.withHuman(getIndexRequest.humanReadable()); params.withHuman(getIndexRequest.humanReadable());
params.withIndicesOptions(getIndexRequest.indicesOptions()); params.withIndicesOptions(getIndexRequest.indicesOptions());
params.withIncludeDefaults(getIndexRequest.includeDefaults()); params.withIncludeDefaults(getIndexRequest.includeDefaults());
return request; return request;
} }
public static Request indexOpen(OpenIndexRequest openIndexRequest) { public static Request indexOpen(OpenIndexRequest openIndexRequest) {
String endpoint = RequestConverters.endpoint(openIndexRequest.indices(), "_open"); String endpoint = RequestConverters.endpoint(openIndexRequest.indices(), "_open");
@ -804,7 +804,7 @@ public class RequestConverters {
} }
public static Request indexCreate(org.elasticsearch.client.indices.CreateIndexRequest createIndexRequest) { public static Request indexCreate(org.elasticsearch.client.indices.CreateIndexRequest createIndexRequest) {
String endpoint = RequestConverters.endpoint(new String[]{createIndexRequest.index()}); String endpoint = RequestConverters.endpoint(new String[] { createIndexRequest.index() });
Request request = new Request(HttpMethod.PUT.name(), endpoint); Request request = new Request(HttpMethod.PUT.name(), endpoint);
Params parameters = new Params(request); Params parameters = new Params(request);
@ -846,17 +846,17 @@ public class RequestConverters {
return request; return request;
} }
public static Request putMapping(org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) { public static Request putMapping(org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
Request request = new Request(HttpMethod.PUT.name(), Request request = new Request(HttpMethod.PUT.name(),
RequestConverters.endpoint(putMappingRequest.indices(), "_mapping")); RequestConverters.endpoint(putMappingRequest.indices(), "_mapping"));
new RequestConverters.Params(request) // new RequestConverters.Params(request) //
.withTimeout(putMappingRequest.timeout()) // .withTimeout(putMappingRequest.timeout()) //
.withMasterTimeout(putMappingRequest.masterNodeTimeout()) // .withMasterTimeout(putMappingRequest.masterNodeTimeout()) //
.withIncludeTypeName(false); .withIncludeTypeName(false);
request.setEntity(RequestConverters.createEntity(putMappingRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); request.setEntity(RequestConverters.createEntity(putMappingRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request; return request;
} }
public static Request flushIndex(FlushRequest flushRequest) { public static Request flushIndex(FlushRequest flushRequest) {
String[] indices = flushRequest.indices() == null ? Strings.EMPTY_ARRAY : flushRequest.indices(); String[] indices = flushRequest.indices() == null ? Strings.EMPTY_ARRAY : flushRequest.indices();
@ -883,10 +883,10 @@ public class RequestConverters {
return request; return request;
} }
public static Request getMapping(org.elasticsearch.client.indices.GetMappingsRequest getMappingsRequest) { public static Request getMapping(org.elasticsearch.client.indices.GetMappingsRequest getMappingsRequest) {
String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices(); String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices();
Request request = new Request(HttpMethod.GET.name(), RequestConverters.endpoint(indices, "_mapping")); Request request = new Request(HttpMethod.GET.name(), RequestConverters.endpoint(indices, "_mapping"));
RequestConverters.Params parameters = new RequestConverters.Params(request); RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withMasterTimeout(getMappingsRequest.masterNodeTimeout()); parameters.withMasterTimeout(getMappingsRequest.masterNodeTimeout());

View File

@ -120,7 +120,7 @@ public class NativeSearchQueryBuilder {
return this; return this;
} }
public NativeSearchQueryBuilder withSearchTemplate(SearchTemplateRequestBuilder searchTemplateBuilder){ public NativeSearchQueryBuilder withSearchTemplate(SearchTemplateRequestBuilder searchTemplateBuilder) {
this.searchTemplateBuilder = searchTemplateBuilder; this.searchTemplateBuilder = searchTemplateBuilder;
return this; return this;
} }

View File

@ -454,10 +454,7 @@ public class ReactiveElasticsearchClientIntegrationTests {
params.put("firstname", "inline"); params.put("firstname", "inline");
request.setScriptParams(params); request.setScriptParams(params);
client.searchTemplate(request) client.searchTemplate(request).as(StepVerifier::create).expectNextCount(1).verifyComplete();
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
} }
@Test // #1725 @Test // #1725
@ -471,14 +468,13 @@ public class ReactiveElasticsearchClientIntegrationTests {
testDoc.put("lastname", "template"); testDoc.put("lastname", "template");
add(testDoc).to(INDEX_I); add(testDoc).to(INDEX_I);
client.execute(c -> c.post() client.execute(c -> c.post().uri(builder -> builder.path("_scripts/searchbyfirstname").build())
.uri(builder -> builder.path("_scripts/searchbyfirstname").build()) .contentType(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON) .bodyValue(
.bodyValue( "{\"script\":{\"lang\":\"mustache\",\"source\":{\"query\":{\"match\":{\"firstname\":\"{{firstname}}\"}}}}}")
"{\"script\":{\"lang\":\"mustache\",\"source\":{\"query\":{\"match\":{\"firstname\":\"{{firstname}}\"}}}}}") .retrieve().bodyToMono(Void.class)).block();
.retrieve()
.bodyToMono(Void.class)) client.indices().refreshIndex(request -> request.indices(INDEX_I)).block();
.block();
SearchTemplateRequest request = new SearchTemplateRequest(new SearchRequest(INDEX_I)); SearchTemplateRequest request = new SearchTemplateRequest(new SearchRequest(INDEX_I));
request.setScriptType(ScriptType.STORED); request.setScriptType(ScriptType.STORED);
@ -487,10 +483,10 @@ public class ReactiveElasticsearchClientIntegrationTests {
params.put("firstname", "stored"); params.put("firstname", "stored");
request.setScriptParams(params); request.setScriptParams(params);
client.searchTemplate(request) client.searchTemplate(request) //
.as(StepVerifier::create) .as(StepVerifier::create) //
.expectNextCount(1) .expectNextCount(1) //
.verifyComplete(); .verifyComplete(); //
} }
@Test // DATAES-488 @Test // DATAES-488