Support for SQL.

Original Pull Request: #2949
Closes: #2683
This commit is contained in:
Aouichaoui Youssef 2024-08-06 20:32:26 +02:00 committed by GitHub
parent 03992ba722
commit 738ee54a25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1180 additions and 21 deletions

View File

@ -23,6 +23,8 @@ import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.msearch.MultiSearchResponseItem;
import co.elastic.clients.elasticsearch.core.search.ResponseBody;
import co.elastic.clients.elasticsearch.sql.ElasticsearchSqlClient;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
@ -56,6 +58,7 @@ import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -74,6 +77,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
private static final Log LOGGER = LogFactory.getLog(ElasticsearchTemplate.class);
private final ElasticsearchClient client;
private final ElasticsearchSqlClient sqlClient;
private final RequestConverter requestConverter;
private final ResponseConverter responseConverter;
private final JsonpMapper jsonpMapper;
@ -85,6 +89,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
Assert.notNull(client, "client must not be null");
this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
@ -97,6 +102,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
Assert.notNull(client, "client must not be null");
this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
@ -656,6 +662,19 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
DeleteScriptRequest request = requestConverter.scriptDelete(name);
return execute(client -> client.deleteScript(request)).acknowledged();
}
@Override
public SqlResponse search(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");
try {
QueryResponse response = sqlClient.query(requestConverter.sqlQueryRequest(query));
return responseConverter.sqlResponse(response);
} catch (IOException e) {
throw exceptionTranslator.translateException(e);
}
}
// endregion
// region client callback

View File

@ -69,6 +69,10 @@ public class ReactiveElasticsearchClient extends ApiClient<ElasticsearchTranspor
return new ReactiveElasticsearchIndicesClient(transport, transportOptions);
}
public ReactiveElasticsearchSqlClient sql() {
return new ReactiveElasticsearchSqlClient(transport, transportOptions);
}
// endregion
// region info

View File

@ -0,0 +1,72 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;
import java.io.IOException;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.sql.QueryRequest;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.util.ObjectBuilder;
import reactor.core.publisher.Mono;
/**
* Reactive version of {@link co.elastic.clients.elasticsearch.sql.ElasticsearchSqlClient}.
*
* @author Aouichaoui Youssef
* @since 5.4
*/
public class ReactiveElasticsearchSqlClient extends ApiClient<ElasticsearchTransport, ReactiveElasticsearchSqlClient> {
public ReactiveElasticsearchSqlClient(ElasticsearchTransport transport, @Nullable TransportOptions transportOptions) {
super(transport, transportOptions);
}
@Override
public ReactiveElasticsearchSqlClient withTransportOptions(@Nullable TransportOptions transportOptions) {
return new ReactiveElasticsearchSqlClient(transport, transportOptions);
}
/**
* Executes a SQL request
*
* @param fn a function that initializes a builder to create the {@link QueryRequest}.
*/
public final Mono<QueryResponse> query(Function<QueryRequest.Builder, ObjectBuilder<QueryRequest>> fn)
throws IOException, ElasticsearchException {
return query(fn.apply(new QueryRequest.Builder()).build());
}
/**
* Executes a SQL request.
*/
public Mono<QueryResponse> query(QueryRequest query) {
return Mono.fromFuture(transport.performRequestAsync(query, QueryRequest._ENDPOINT, transportOptions));
}
/**
* Executes a SQL request.
*/
public Mono<QueryResponse> query() {
return Mono.fromFuture(
transport.performRequestAsync(new QueryRequest.Builder().build(), QueryRequest._ENDPOINT, transportOptions));
}
}

View File

@ -25,6 +25,8 @@ import co.elastic.clients.elasticsearch.core.search.ResponseBody;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -88,6 +90,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class);
private final ReactiveElasticsearchClient client;
private final ReactiveElasticsearchSqlClient sqlClient;
private final RequestConverter requestConverter;
private final ResponseConverter responseConverter;
private final JsonpMapper jsonpMapper;
@ -99,6 +102,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
Assert.notNull(client, "client must not be null");
this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(converter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
@ -646,6 +650,14 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return NativeQuery.builder().withIds(ids);
}
@Override
public Mono<SqlResponse> search(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");
co.elastic.clients.elasticsearch.sql.QueryRequest request = requestConverter.sqlQueryRequest(query);
return sqlClient.query(request).onErrorMap(this::translateException).map(responseConverter::sqlResponse);
}
/**
* Callback interface to be used with {@link #execute(ReactiveElasticsearchTemplate.ClientCallback<>)} for operating
* directly on {@link ReactiveElasticsearchClient}.

View File

@ -68,6 +68,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -530,6 +531,22 @@ class RequestConverter extends AbstractQueryProcessor {
.of(gtr -> gtr.name(getTemplateRequest.getTemplateName()).flatSettings(true));
}
public co.elastic.clients.elasticsearch.sql.QueryRequest sqlQueryRequest(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");
return co.elastic.clients.elasticsearch.sql.QueryRequest.of(sqb -> {
sqb.query(query.getQuery()).catalog(query.getCatalog()).columnar(query.getColumnar()).cursor(query.getCursor())
.fetchSize(query.getFetchSize()).fieldMultiValueLeniency(query.getFieldMultiValueLeniency())
.indexUsingFrozen(query.getIndexIncludeFrozen()).keepAlive(time(query.getKeepAlive()))
.keepOnCompletion(query.getKeepOnCompletion()).pageTimeout(time(query.getPageTimeout()))
.requestTimeout(time(query.getRequestTimeout()))
.waitForCompletionTimeout(time(query.getWaitForCompletionTimeout())).filter(getQuery(query.getFilter(), null))
.timeZone(Objects.toString(query.getTimeZone(), null)).format("json");
return sqb;
});
}
// endregion
// region documents

View File

@ -15,25 +15,9 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.JsonUtils.*;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.ComponentTemplateSummary;
import co.elastic.clients.elasticsearch.cluster.GetComponentTemplateResponse;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.GetScriptResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.get_index_template.IndexTemplateItem;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.json.JsonpMapper;
import static org.springframework.data.elasticsearch.client.elc.JsonUtils.toJson;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.removePrefixFromJson;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.typeMapping;
import java.util.ArrayList;
import java.util.HashMap;
@ -61,10 +45,41 @@ import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import org.springframework.data.elasticsearch.support.DefaultStringObjectMap;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.ComponentTemplateSummary;
import co.elastic.clients.elasticsearch.cluster.GetComponentTemplateResponse;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.GetScriptResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.Alias;
import co.elastic.clients.elasticsearch.indices.AliasDefinition;
import co.elastic.clients.elasticsearch.indices.GetAliasResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexTemplateResponse;
import co.elastic.clients.elasticsearch.indices.GetIndicesSettingsResponse;
import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
import co.elastic.clients.elasticsearch.indices.GetTemplateResponse;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.elasticsearch.indices.IndexTemplateSummary;
import co.elastic.clients.elasticsearch.indices.TemplateMapping;
import co.elastic.clients.elasticsearch.indices.get_index_template.IndexTemplateItem;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
/**
* Class to convert Elasticsearch responses into Spring Data Elasticsearch classes.
*
@ -536,6 +551,29 @@ class ResponseConverter {
}
// endregion
// region sql
public SqlResponse sqlResponse(QueryResponse response) {
SqlResponse.Builder builder = SqlResponse.builder();
builder.withRunning(Boolean.TRUE.equals(response.isRunning()))
.withPartial(Boolean.TRUE.equals(response.isPartial())).withCursor(response.cursor());
final List<SqlResponse.Column> columns = response.columns().stream()
.map(column -> new SqlResponse.Column(column.name(), column.type())).toList();
builder.withColumns(columns);
for (List<JsonData> rowValues : response.rows()) {
SqlResponse.Row.Builder rowBuilder = SqlResponse.Row.builder();
for (int idx = 0; idx < rowValues.size(); idx++) {
rowBuilder.withValue(columns.get(idx), rowValues.get(idx).toJson());
}
builder.withRow(rowBuilder.build());
}
return builder.build();
}
// end region
// region helper functions
private long timeToLong(Time time) {

View File

@ -20,6 +20,7 @@ import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverte
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.script.ScriptOperations;
import org.springframework.data.elasticsearch.core.sql.SqlOperations;
import org.springframework.lang.Nullable;
/**
@ -35,7 +36,7 @@ import org.springframework.lang.Nullable;
* @author Dmitriy Yakovlev
* @author Peter-Josef Meisch
*/
public interface ElasticsearchOperations extends DocumentOperations, SearchOperations, ScriptOperations {
public interface ElasticsearchOperations extends DocumentOperations, SearchOperations, ScriptOperations, SqlOperations {
/**
* get an {@link IndexOperations} that is bound to the given class

View File

@ -21,6 +21,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.script.ReactiveScriptOperations;
import org.springframework.data.elasticsearch.core.sql.ReactiveSqlOperations;
import org.springframework.lang.Nullable;
/**
@ -31,7 +32,7 @@ import org.springframework.lang.Nullable;
* @since 3.2
*/
public interface ReactiveElasticsearchOperations
extends ReactiveDocumentOperations, ReactiveSearchOperations, ReactiveScriptOperations {
extends ReactiveDocumentOperations, ReactiveSearchOperations, ReactiveScriptOperations, ReactiveSqlOperations {
/**
* Get the {@link ElasticsearchConverter} used.

View File

@ -0,0 +1,433 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Defines an SQL request.
*
* @author Aouichaoui Youssef
* @see <a href= "https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-search-api.html">docs</a>
* @since 5.4
*/
public class SqlQuery {
/**
* If true, returns partial results if there are shard request timeouts or shard failures.
* <p>
* Default, this is set to {@code false}.
*/
@Nullable private final Boolean allowPartialSearchResults;
/**
* Default catalog/cluster for queries. If unspecified, the queries are executed on the data in the local cluster
* only.
*/
@Nullable private final String catalog;
/**
* If true, returns results in a columnar format.
* <p>
* Default, this is set to {@code false}.
*/
@Nullable private final Boolean columnar;
/**
* To retrieve a set of paginated results, ignore other request body parameters when specifying a cursor and using the
* {@link #columnar} and {@link #timeZone} parameters.
*/
@Nullable private final String cursor;
/**
* Maximum number of rows to return in the response.
* <p>
* Default, this is set to {@code 1000}.
*/
@Nullable private final Integer fetchSize;
/**
* If false, the API returns an error for fields containing array values.
* <p>
* Default, this is set to {@code false}.
*/
@Nullable private final Boolean fieldMultiValueLeniency;
/**
* Query that filter documents for the SQL search.
*/
@Nullable private final Query filter;
/**
* If true, the search can run on frozen indices.
* <p>
* Default, this is set to {@code false}.
*/
@Nullable private final Boolean indexIncludeFrozen;
/**
* Retention period for an async or saved synchronous search.
* <p>
* Default, this is set to {@code 5 days}.
*/
@Nullable private final Duration keepAlive;
/**
* If it is true, it will store synchronous searches when the {@link #waitForCompletionTimeout} parameter is
* specified.
*/
@Nullable private final Boolean keepOnCompletion;
/**
* Minimum retention period for the scroll cursor.
* <p>
* Default, this is set to {@code 45 seconds}.
*/
@Nullable private final Duration pageTimeout;
/**
* Timeout before the request fails.
* <p>
* Default, this is set to {@code 90 seconds}.
*/
@Nullable private final Duration requestTimeout;
/**
* Values for parameters in the query.
*/
@Nullable private final List<Object> params;
/**
* SQL query to run.
*/
private final String query;
/**
* Time zone ID for the search.
* <p>
* Default, this is set to {@code UTC}.
*/
@Nullable private final TimeZone timeZone;
/**
* Period to wait for complete results.
* <p>
* Default, this is set to no timeout.
*/
@Nullable private final Duration waitForCompletionTimeout;
private SqlQuery(Builder builder) {
this.allowPartialSearchResults = builder.allowPartialSearchResults;
this.catalog = builder.catalog;
this.columnar = builder.columnar;
this.cursor = builder.cursor;
this.fetchSize = builder.fetchSize;
this.fieldMultiValueLeniency = builder.fieldMultiValueLeniency;
this.filter = builder.filter;
this.indexIncludeFrozen = builder.indexIncludeFrozen;
this.keepAlive = builder.keepAlive;
this.keepOnCompletion = builder.keepOnCompletion;
this.pageTimeout = builder.pageTimeout;
this.requestTimeout = builder.requestTimeout;
this.params = builder.params;
this.query = builder.query;
this.timeZone = builder.timeZone;
this.waitForCompletionTimeout = builder.waitForCompletionTimeout;
}
@Nullable
public Boolean getAllowPartialSearchResults() {
return allowPartialSearchResults;
}
@Nullable
public String getCatalog() {
return catalog;
}
@Nullable
public Boolean getColumnar() {
return columnar;
}
@Nullable
public String getCursor() {
return cursor;
}
@Nullable
public Integer getFetchSize() {
return fetchSize;
}
@Nullable
public Boolean getFieldMultiValueLeniency() {
return fieldMultiValueLeniency;
}
@Nullable
public Query getFilter() {
return filter;
}
@Nullable
public Boolean getIndexIncludeFrozen() {
return indexIncludeFrozen;
}
@Nullable
public Duration getKeepAlive() {
return keepAlive;
}
@Nullable
public Boolean getKeepOnCompletion() {
return keepOnCompletion;
}
@Nullable
public Duration getPageTimeout() {
return pageTimeout;
}
@Nullable
public Duration getRequestTimeout() {
return requestTimeout;
}
@Nullable
public List<Object> getParams() {
return params;
}
public String getQuery() {
return query;
}
@Nullable
public TimeZone getTimeZone() {
return timeZone;
}
@Nullable
public Duration getWaitForCompletionTimeout() {
return waitForCompletionTimeout;
}
public static Builder builder(String query) {
return new Builder(query);
}
public static class Builder {
@Nullable private Boolean allowPartialSearchResults;
@Nullable private String catalog;
@Nullable private Boolean columnar;
@Nullable private String cursor;
@Nullable private Integer fetchSize;
@Nullable private Boolean fieldMultiValueLeniency;
@Nullable private Query filter;
@Nullable private Boolean indexIncludeFrozen;
@Nullable private Duration keepAlive;
@Nullable private Boolean keepOnCompletion;
@Nullable private Duration pageTimeout;
@Nullable private Duration requestTimeout;
@Nullable private List<Object> params;
private final String query;
@Nullable private TimeZone timeZone;
@Nullable private Duration waitForCompletionTimeout;
private Builder(String query) {
Assert.notNull(query, "query must not be null");
this.query = query;
}
/**
* If true, returns partial results if there are shard request timeouts or shard failures.
*/
public Builder withAllowPartialSearchResults(Boolean allowPartialSearchResults) {
this.allowPartialSearchResults = allowPartialSearchResults;
return this;
}
/**
* Default catalog/cluster for queries. If unspecified, the queries are executed on the data in the local cluster
* only.
*/
public Builder withCatalog(String catalog) {
this.catalog = catalog;
return this;
}
/**
* If true, returns results in a columnar format.
*/
public Builder withColumnar(Boolean columnar) {
this.columnar = columnar;
return this;
}
/**
* To retrieve a set of paginated results, ignore other request body parameters when specifying a cursor and using
* the {@link #columnar} and {@link #timeZone} parameters.
*/
public Builder withCursor(String cursor) {
this.cursor = cursor;
return this;
}
/**
* Maximum number of rows to return in the response.
*/
public Builder withFetchSize(Integer fetchSize) {
this.fetchSize = fetchSize;
return this;
}
/**
* If false, the API returns an error for fields containing array values.
*/
public Builder withFieldMultiValueLeniency(Boolean fieldMultiValueLeniency) {
this.fieldMultiValueLeniency = fieldMultiValueLeniency;
return this;
}
/**
* Query that filter documents for the SQL search.
*/
public Builder setFilter(Query filter) {
this.filter = filter;
return this;
}
/**
* If true, the search can run on frozen indices.
*/
public Builder withIndexIncludeFrozen(Boolean indexIncludeFrozen) {
this.indexIncludeFrozen = indexIncludeFrozen;
return this;
}
/**
* Retention period for an async or saved synchronous search.
*/
public Builder setKeepAlive(Duration keepAlive) {
this.keepAlive = keepAlive;
return this;
}
/**
* If it is true, it will store synchronous searches when the {@link #waitForCompletionTimeout} parameter is
* specified.
*/
public Builder withKeepOnCompletion(Boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
return this;
}
/**
* Minimum retention period for the scroll cursor.
*/
public Builder withPageTimeout(Duration pageTimeout) {
this.pageTimeout = pageTimeout;
return this;
}
/**
* Timeout before the request fails.
*/
public Builder withRequestTimeout(Duration requestTimeout) {
this.requestTimeout = requestTimeout;
return this;
}
/**
* Values for parameters in the query.
*/
public Builder withParams(List<Object> params) {
this.params = params;
return this;
}
/**
* Value for parameters in the query.
*/
public Builder withParam(Object param) {
if (this.params == null) {
this.params = new ArrayList<>();
}
this.params.add(param);
return this;
}
/**
* Time zone ID for the search.
*/
public Builder withTimeZone(TimeZone timeZone) {
this.timeZone = timeZone;
return this;
}
/**
* Period to wait for complete results.
*/
public Builder withWaitForCompletionTimeout(Duration waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
return this;
}
public SqlQuery build() {
return new SqlQuery(this);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.sql;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
import reactor.core.publisher.Mono;
/**
* The reactive version of operations for the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-search-api.html">SQL search API</a>.
*
* @author Aouichaoui Youssef
* @since 5.4
*/
public interface ReactiveSqlOperations {
/**
* Execute the sql {@code query} against elasticsearch and return result as {@link SqlResponse}
*
* @param query the query to execute
* @return {@link SqlResponse} containing the list of found objects
*/
Mono<SqlResponse> search(SqlQuery query);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.sql;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
/**
* The operations for the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-search-api.html">SQL search API</a>.
*
* @author Aouichaoui Youssef
* @since 5.4
*/
public interface SqlOperations {
/**
* Execute the sql {@code query} against elasticsearch and return result as {@link SqlResponse}
*
* @param query the query to execute
* @return {@link SqlResponse} containing the list of found objects
*/
SqlResponse search(SqlQuery query);
}

View File

@ -0,0 +1,217 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.sql;
import static java.util.Collections.unmodifiableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import jakarta.json.JsonValue;
/**
* Defines an SQL response.
*
* @author Aouichaoui Youssef
* @see <a href= "https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-search-api.html">docs</a>
* @since 5.4
*/
public class SqlResponse {
/**
* If {@code true}, the search is still running.
*/
private final boolean running;
/**
* If {@code true}, the response does not contain complete search results.
*/
private final boolean partial;
/**
* Cursor for the next set of paginated results.
*/
@Nullable private final String cursor;
/**
* Column headings for the search results.
*/
private final List<Column> columns;
/**
* Values for the search results.
*/
private final List<Row> rows;
private SqlResponse(Builder builder) {
this.running = builder.running;
this.partial = builder.partial;
this.cursor = builder.cursor;
this.columns = unmodifiableList(builder.columns);
this.rows = unmodifiableList(builder.rows);
}
public boolean isRunning() {
return running;
}
public boolean isPartial() {
return partial;
}
@Nullable
public String getCursor() {
return cursor;
}
public List<Column> getColumns() {
return columns;
}
public List<Row> getRows() {
return rows;
}
public static Builder builder() {
return new Builder();
}
public record Column(String name, String type) {
}
public static class Row implements Iterable<Map.Entry<Column, JsonValue>> {
private final Map<Column, JsonValue> row;
private Row(Builder builder) {
this.row = builder.row;
}
public static Builder builder() {
return new Builder();
}
@NonNull
@Override
public Iterator<Map.Entry<Column, JsonValue>> iterator() {
return row.entrySet().iterator();
}
@Nullable
public JsonValue get(Column column) {
return row.get(column);
}
public static class Builder {
private final Map<Column, JsonValue> row = new HashMap<>();
public Builder withValue(Column column, JsonValue value) {
this.row.put(column, value);
return this;
}
public Row build() {
return new Row(this);
}
}
}
public static class Builder {
private boolean running;
private boolean partial;
@Nullable private String cursor;
private final List<Column> columns = new ArrayList<>();
private final List<Row> rows = new ArrayList<>();
private Builder() {}
/**
* If {@code true}, the search is still running.
*/
public Builder withRunning(boolean running) {
this.running = running;
return this;
}
/**
* If {@code true}, the response does not contain complete search results.
*/
public Builder withPartial(boolean partial) {
this.partial = partial;
return this;
}
/**
* Cursor for the next set of paginated results.
*/
public Builder withCursor(@Nullable String cursor) {
this.cursor = cursor;
return this;
}
/**
* Column headings for the search results.
*/
public Builder withColumns(List<Column> columns) {
this.columns.addAll(columns);
return this;
}
/**
* Column heading for the search results.
*/
public Builder withColumn(Column column) {
this.columns.add(column);
return this;
}
/**
* Values for the search results.
*/
public Builder withRows(List<Row> rows) {
this.rows.addAll(rows);
return this;
}
/**
* Value for the search results.
*/
public Builder withRow(Row row) {
this.rows.add(row);
return this;
}
public SqlResponse build() {
return new SqlResponse(this);
}
}
}

View File

@ -0,0 +1,6 @@
/**
* Classes and interfaces to access to SQL API of Elasticsearch.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.data.elasticsearch.core.sql;

View File

@ -0,0 +1,124 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.sql;
import static org.springframework.data.elasticsearch.core.IndexOperationsAdapter.blocking;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
import reactor.test.StepVerifier;
/**
* Testing the reactive querying using SQL syntax.
*
* @author Youssef Aouichaoui
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ReactiveSqlOperationsIntegrationTests.Config.class })
@DisplayName("Using Elasticsearch SQL Reactive Client")
public class ReactiveSqlOperationsIntegrationTests {
@Autowired ReactiveElasticsearchOperations operations;
@BeforeEach
void setUp() {
// create index
blocking(operations.indexOps(EntityForSQL.class)).createWithMapping();
// add data
operations
.saveAll(List.of(EntityForSQL.builder().withViews(3).build(), EntityForSQL.builder().withViews(0).build()),
EntityForSQL.class)
.blockLast();
}
@AfterEach
void tearDown() {
// delete index
blocking(operations.indexOps(EntityForSQL.class)).delete();
}
// begin configuration region
@Configuration
@Import({ ReactiveElasticsearchTemplateConfiguration.class })
static class Config {}
// end region
@Test // #2683
void when_search_with_an_sql_query() {
// Given
SqlQuery query = SqlQuery.builder("SELECT * FROM entity_for_sql WHERE views = 0").build();
// When
// Then
operations.search(query).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
// begin region
@Document(indexName = "entity_for_sql")
static class EntityForSQL {
@Id private String id;
private final Integer views;
public EntityForSQL(EntityForSQL.Builder builder) {
this.views = builder.views;
}
@Nullable
public String getId() {
return id;
}
public Integer getViews() {
return views;
}
public static EntityForSQL.Builder builder() {
return new EntityForSQL.Builder();
}
static class Builder {
private Integer views = 0;
public EntityForSQL.Builder withViews(Integer views) {
this.views = views;
return this;
}
public EntityForSQL build() {
return new EntityForSQL(this);
}
}
}
// end region
}

View File

@ -0,0 +1,143 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.sql;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
* Testing the querying using SQL syntax.
*
* @author Youssef Aouichaoui
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { SqlOperationsIntegrationTests.Config.class })
@DisplayName("Using Elasticsearch SQL Client")
class SqlOperationsIntegrationTests {
@Autowired ElasticsearchOperations operations;
@Nullable IndexOperations indexOps;
@BeforeEach
void setUp() {
// create index
indexOps = operations.indexOps(EntityForSQL.class);
indexOps.createWithMapping();
// add data
operations.save(EntityForSQL.builder().withViews(3).build(), EntityForSQL.builder().withViews(0).build());
}
@AfterEach
void tearDown() {
// delete index
if (indexOps != null) {
indexOps.delete();
}
}
// begin configuration region
@Configuration
@Import({ ElasticsearchTemplateConfiguration.class })
static class Config {}
// end region
@Test // #2683
void when_search_with_an_sql_query() {
// Given
SqlQuery query = SqlQuery.builder("SELECT * FROM entity_for_sql WHERE views = 0").build();
// When
// Then
SqlResponse response = operations.search(query);
assertNotNull(response);
assertFalse(response.getRows().isEmpty());
assertEquals(1, response.getRows().size());
}
@Test // #2683
void when_search_with_an_sql_query_that_has_aggregated_column() {
// Given
SqlQuery query = SqlQuery.builder("SELECT SUM(views) AS TOTAL FROM entity_for_sql").build();
// When
// Then
SqlResponse response = operations.search(query);
assertThat(response.getColumns()).first().extracting(SqlResponse.Column::name).isEqualTo("TOTAL");
assertThat(response.getRows()).hasSize(1).first().extracting(row -> row.get(response.getColumns().get(0)))
.hasToString("3");
}
// begin region
@Document(indexName = "entity_for_sql")
static class EntityForSQL {
@Id private String id;
private final Integer views;
public EntityForSQL(Builder builder) {
this.views = builder.views;
}
@Nullable
public String getId() {
return id;
}
public Integer getViews() {
return views;
}
public static Builder builder() {
return new Builder();
}
static class Builder {
private Integer views = 0;
public Builder withViews(Integer views) {
this.views = views;
return this;
}
public EntityForSQL build() {
return new EntityForSQL(this);
}
}
}
// end region
}