SQL: Improve serialization of SQL processors (#45678)
Encapsulate the serialization/deserialization of SQL client classes. Make configuration specific parameters (such as ZoneId) generic just like the version and remove the need for consumer classes to manage them individually. This is not only consistent but also provides significant savings in the cursor. Fix #40216 (cherry picked from commit 5c844798045d7baa0d932289d2e3d1607ba6a9a4)
This commit is contained in:
parent
c6b30b8883
commit
0f51dd69cb
|
@ -194,7 +194,7 @@ public abstract class AbstractSqlQueryRequest extends AbstractSqlRequest impleme
|
|||
super(in);
|
||||
query = in.readString();
|
||||
params = in.readList(AbstractSqlQueryRequest::readSqlTypedParamValue);
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
zoneId = in.readZoneId();
|
||||
fetchSize = in.readVInt();
|
||||
requestTimeout = in.readTimeValue();
|
||||
pageTimeout = in.readTimeValue();
|
||||
|
@ -218,7 +218,7 @@ public abstract class AbstractSqlQueryRequest extends AbstractSqlRequest impleme
|
|||
for (SqlTypedParamValue param: params) {
|
||||
writeSqlTypedParamValue(out, param);
|
||||
}
|
||||
out.writeString(zoneId.getId());
|
||||
out.writeZoneId(zoneId);
|
||||
out.writeVInt(fetchSize);
|
||||
out.writeTimeValue(requestTimeout);
|
||||
out.writeTimeValue(pageTimeout);
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.sql.common.io;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Base64;
|
||||
|
||||
/**
|
||||
* SQL-specific stream extension for {@link StreamInput} used for deserializing
|
||||
* SQL components, especially on the client-side.
|
||||
*/
|
||||
public class SqlStreamInput extends NamedWriteableAwareStreamInput {
|
||||
|
||||
private final ZoneId zoneId;
|
||||
|
||||
public SqlStreamInput(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
|
||||
this(Base64.getDecoder().decode(base64encoded), namedWriteableRegistry, version);
|
||||
}
|
||||
|
||||
public SqlStreamInput(byte[] input, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
|
||||
super(StreamInput.wrap(input), namedWriteableRegistry);
|
||||
|
||||
// version check first
|
||||
Version ver = Version.readVersion(delegate);
|
||||
if (version.compareTo(ver) != 0) {
|
||||
throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", ver, version);
|
||||
}
|
||||
delegate.setVersion(version);
|
||||
// configuration settings
|
||||
zoneId = delegate.readZoneId();
|
||||
}
|
||||
|
||||
public ZoneId zoneId() {
|
||||
return zoneId;
|
||||
}
|
||||
|
||||
public static SqlStreamInput asSqlStream(StreamInput in) {
|
||||
if (in instanceof SqlStreamInput) {
|
||||
return (SqlStreamInput) in;
|
||||
}
|
||||
throw new SqlIllegalArgumentException("Expected SQL cursor stream, received [{}]", in.getClass());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.sql.common.io;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Base64;
|
||||
|
||||
public class SqlStreamOutput extends OutputStreamStreamOutput {
|
||||
|
||||
private final ByteArrayOutputStream bytes;
|
||||
|
||||
public SqlStreamOutput(Version version, ZoneId zoneId) throws IOException {
|
||||
this(new ByteArrayOutputStream(), version, zoneId);
|
||||
}
|
||||
|
||||
private SqlStreamOutput(ByteArrayOutputStream bytes, Version version, ZoneId zoneId) throws IOException {
|
||||
super(Base64.getEncoder().wrap(new OutputStreamStreamOutput(bytes)));
|
||||
this.bytes = bytes;
|
||||
|
||||
Version.writeVersion(version, this);
|
||||
writeZoneId(zoneId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called _after_ closing the stream - there are no guarantees otherwise.
|
||||
*/
|
||||
public String streamAsString() {
|
||||
// Base64 uses this encoding instead of UTF-8
|
||||
return new String(bytes.toByteArray(), StandardCharsets.ISO_8859_1);
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
||||
|
@ -42,15 +43,15 @@ public class CompositeKeyExtractor implements BucketExtractor {
|
|||
CompositeKeyExtractor(StreamInput in) throws IOException {
|
||||
key = in.readString();
|
||||
property = in.readEnum(Property.class);
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
isDateTimeBased = in.readBoolean();
|
||||
|
||||
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(key);
|
||||
out.writeEnum(property);
|
||||
out.writeString(zoneId.getId());
|
||||
out.writeBoolean(isDateTimeBased);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
@ -95,11 +96,12 @@ public class FieldHitExtractor implements HitExtractor {
|
|||
}
|
||||
String esType = in.readOptionalString();
|
||||
dataType = esType != null ? DataType.fromTypeName(esType) : null;
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
useDocValue = in.readBoolean();
|
||||
hitName = in.readOptionalString();
|
||||
arrayLeniency = in.readBoolean();
|
||||
path = sourcePath(fieldName, useDocValue, hitName);
|
||||
|
||||
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -114,7 +116,6 @@ public class FieldHitExtractor implements HitExtractor {
|
|||
out.writeOptionalString(fullFieldName);
|
||||
}
|
||||
out.writeOptionalString(dataType == null ? null : dataType.typeName);
|
||||
out.writeString(zoneId.getId());
|
||||
out.writeBoolean(useDocValue);
|
||||
out.writeOptionalString(hitName);
|
||||
out.writeBoolean(arrayLeniency);
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalSum;
|
|||
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
||||
|
@ -55,7 +56,8 @@ public class MetricAggExtractor implements BucketExtractor {
|
|||
property = in.readString();
|
||||
innerKey = in.readOptionalString();
|
||||
isDateTimeBased = in.readBoolean();
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
|
||||
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,7 +66,6 @@ public class MetricAggExtractor implements BucketExtractor {
|
|||
out.writeString(property);
|
||||
out.writeOptionalString(innerKey);
|
||||
out.writeBoolean(isDateTimeBased);
|
||||
out.writeString(zoneId.getId());
|
||||
}
|
||||
|
||||
String name() {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
||||
|
@ -34,14 +35,13 @@ public class TopHitsAggExtractor implements BucketExtractor {
|
|||
TopHitsAggExtractor(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
fieldDataType = in.readEnum(DataType.class);
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeEnum(fieldDataType);
|
||||
out.writeString(zoneId.getId());
|
||||
}
|
||||
|
||||
String name() {
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.expression.gen.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -24,12 +24,7 @@ public abstract class BaseDateTimeProcessor implements Processor {
|
|||
}
|
||||
|
||||
BaseDateTimeProcessor(StreamInput in) throws IOException {
|
||||
zoneId = ZoneId.of(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(zoneId.getId());
|
||||
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
|
||||
}
|
||||
|
||||
ZoneId zoneId() {
|
||||
|
|
|
@ -63,7 +63,6 @@ public class DateTimeProcessor extends BaseDateTimeProcessor {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeEnum(extractor);
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ public class NamedDateTimeProcessor extends BaseDateTimeProcessor {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeEnum(extractor);
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,6 @@ public class NonIsoDateTimeProcessor extends BaseDateTimeProcessor {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeEnum(extractor);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
|
@ -16,6 +17,10 @@ import java.util.Locale;
|
|||
import java.util.Objects;
|
||||
|
||||
public class QuarterProcessor extends BaseDateTimeProcessor {
|
||||
|
||||
public static final String NAME = "q";
|
||||
private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
|
||||
|
||||
|
||||
public QuarterProcessor(ZoneId zoneId) {
|
||||
super(zoneId);
|
||||
|
@ -25,8 +30,8 @@ public class QuarterProcessor extends BaseDateTimeProcessor {
|
|||
super(in);
|
||||
}
|
||||
|
||||
public static final String NAME = "q";
|
||||
private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
|
|
|
@ -110,7 +110,7 @@ public class RestSqlQueryAction extends BaseRestHandler {
|
|||
final String data = textFormat.format(request, response);
|
||||
|
||||
restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
|
||||
data.getBytes(StandardCharsets.UTF_8));
|
||||
data.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
if (response.hasCursor()) {
|
||||
restResponse.addHeader("Cursor", response.cursor());
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.action.BasicFormatter;
|
||||
|
@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.session.Cursors;
|
|||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
import org.elasticsearch.xpack.sql.util.StringUtils;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -43,10 +44,13 @@ enum TextFormat {
|
|||
String format(RestRequest request, SqlQueryResponse response) {
|
||||
BasicFormatter formatter = null;
|
||||
Cursor cursor = null;
|
||||
ZoneId zoneId = null;
|
||||
|
||||
// check if the cursor is already wrapped first
|
||||
if (response.hasCursor()) {
|
||||
cursor = Cursors.decodeFromString(response.cursor());
|
||||
Tuple<Cursor, ZoneId> tuple = Cursors.decodeFromStringWithZone(response.cursor());
|
||||
cursor = tuple.v1();
|
||||
zoneId = tuple.v2();
|
||||
if (cursor instanceof TextFormatterCursor) {
|
||||
formatter = ((TextFormatterCursor) cursor).getFormatter();
|
||||
}
|
||||
|
@ -58,7 +62,7 @@ enum TextFormat {
|
|||
formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
|
||||
// if there's a cursor, wrap the formatter in it
|
||||
if (cursor != null) {
|
||||
response.cursor(Cursors.encodeToString(Version.CURRENT, new TextFormatterCursor(cursor, formatter)));
|
||||
response.cursor(Cursors.encodeToString(new TextFormatterCursor(cursor, formatter), zoneId));
|
||||
}
|
||||
// format with header
|
||||
return formatter.formatWithHeader(response.columns(), response.rows());
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.plugin;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
|
@ -81,7 +80,7 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
|
|||
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
|
||||
} else {
|
||||
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
|
||||
wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)),
|
||||
wrap(p -> listener.onResponse(createResponse(request, null, p)),
|
||||
listener::onFailure));
|
||||
}
|
||||
}
|
||||
|
@ -102,10 +101,10 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
|
|||
}
|
||||
}
|
||||
columns = unmodifiableList(columns);
|
||||
return createResponse(request.mode(), request.columnar(), columns, page);
|
||||
return createResponse(request, columns, page);
|
||||
}
|
||||
|
||||
static SqlQueryResponse createResponse(Mode mode, boolean columnar, List<ColumnInfo> header, Page page) {
|
||||
static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
|
||||
List<List<Object>> rows = new ArrayList<>();
|
||||
page.rowSet().forEachRow(rowView -> {
|
||||
List<Object> row = new ArrayList<>(rowView.columnCount());
|
||||
|
@ -114,9 +113,9 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
|
|||
});
|
||||
|
||||
return new SqlQueryResponse(
|
||||
Cursors.encodeToString(Version.CURRENT, page.next()),
|
||||
mode,
|
||||
columnar,
|
||||
Cursors.encodeToString(page.next(), request.zoneId()),
|
||||
request.mode(),
|
||||
request.columnar(),
|
||||
header,
|
||||
rows);
|
||||
}
|
||||
|
|
|
@ -6,13 +6,12 @@
|
|||
package org.elasticsearch.xpack.sql.session;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamOutput;
|
||||
import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor;
|
||||
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors;
|
||||
|
@ -22,11 +21,9 @@ import org.elasticsearch.xpack.sql.expression.literal.Literals;
|
|||
import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
|
||||
import org.elasticsearch.xpack.sql.util.StringUtils;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -35,6 +32,7 @@ import java.util.List;
|
|||
public final class Cursors {
|
||||
|
||||
private static final NamedWriteableRegistry WRITEABLE_REGISTRY = new NamedWriteableRegistry(getNamedWriteables());
|
||||
private static final Version VERSION = Version.CURRENT;
|
||||
|
||||
private Cursors() {}
|
||||
|
||||
|
@ -65,17 +63,20 @@ public final class Cursors {
|
|||
/**
|
||||
* Write a {@linkplain Cursor} to a string for serialization across xcontent.
|
||||
*/
|
||||
public static String encodeToString(Version version, Cursor info) {
|
||||
public static String encodeToString(Cursor info, ZoneId zoneId) {
|
||||
return encodeToString(info, VERSION, zoneId);
|
||||
}
|
||||
|
||||
static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
|
||||
if (info == Cursor.EMPTY) {
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) {
|
||||
Version.writeVersion(version, out);
|
||||
out.writeNamedWriteable(info);
|
||||
}
|
||||
return os.toString(StandardCharsets.UTF_8.name());
|
||||
} catch (Exception ex) {
|
||||
try (SqlStreamOutput output = new SqlStreamOutput(version, zoneId)) {
|
||||
output.writeNamedWriteable(info);
|
||||
output.close();
|
||||
// return the string only after closing the resource
|
||||
return output.streamAsString();
|
||||
} catch (IOException ex) {
|
||||
throw new SqlIllegalArgumentException("Unexpected failure retrieving next page", ex);
|
||||
}
|
||||
}
|
||||
|
@ -84,22 +85,23 @@ public final class Cursors {
|
|||
/**
|
||||
* Read a {@linkplain Cursor} from a string.
|
||||
*/
|
||||
public static Cursor decodeFromString(String info) {
|
||||
if (info.isEmpty()) {
|
||||
return Cursor.EMPTY;
|
||||
public static Cursor decodeFromString(String base64) {
|
||||
return decodeFromStringWithZone(base64).v1();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a {@linkplain Cursor} from a string.
|
||||
*/
|
||||
public static Tuple<Cursor, ZoneId> decodeFromStringWithZone(String base64) {
|
||||
if (base64.isEmpty()) {
|
||||
return new Tuple<>(Cursor.EMPTY, null);
|
||||
}
|
||||
byte[] bytes = info.getBytes(StandardCharsets.UTF_8);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(Base64.getDecoder().decode(bytes)), WRITEABLE_REGISTRY)) {
|
||||
Version version = Version.readVersion(in);
|
||||
if (version.after(Version.CURRENT)) {
|
||||
throw new SqlIllegalArgumentException("Unsupported cursor version " + version);
|
||||
}
|
||||
in.setVersion(version);
|
||||
return in.readNamedWriteable(Cursor.class);
|
||||
} catch (SqlIllegalArgumentException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
try (SqlStreamInput in = new SqlStreamInput(base64, WRITEABLE_REGISTRY, VERSION)) {
|
||||
Cursor cursor = in.readNamedWriteable(Cursor.class);
|
||||
return new Tuple<>(cursor, in.zoneId());
|
||||
} catch (IOException ex) {
|
||||
throw new SqlIllegalArgumentException("Unexpected failure decoding cursor", ex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.sql;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireTestCase;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
|
||||
import org.elasticsearch.xpack.sql.common.io.SqlStreamOutput;
|
||||
import org.elasticsearch.xpack.sql.session.Cursors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
|
||||
public abstract class AbstractSqlWireSerializingTestCase<T extends Writeable> extends AbstractWireTestCase<T> {
|
||||
|
||||
@Override
|
||||
protected T copyInstance(T instance, Version version) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
ZoneId zoneId = instanceZoneId(instance);
|
||||
SqlStreamOutput out = new SqlStreamOutput(version, zoneId);
|
||||
instance.writeTo(out);
|
||||
out.close();
|
||||
try (SqlStreamInput in = new SqlStreamInput(out.streamAsString(), getNamedWriteableRegistry(), version)) {
|
||||
return instanceReader().read(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected ZoneId instanceZoneId(T instance) {
|
||||
return randomSafeZone();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(Cursors.getNamedWriteables());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* We need to exclude SystemV/* time zones because they cannot be converted
|
||||
* back to DateTimeZone which we currently still need to do internally,
|
||||
* e.g. in bwc serialization and in the extract() method
|
||||
*/
|
||||
protected static ZoneId randomSafeZone() {
|
||||
return randomValueOtherThanMany(zi -> zi.getId().startsWith("SystemV"), () -> randomZone());
|
||||
}
|
||||
}
|
|
@ -5,39 +5,38 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.execution.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.CompositeKeyExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.MetricAggExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.session.Cursors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class CompositeAggregationCursorTests extends AbstractWireSerializingTestCase<CompositeAggregationCursor> {
|
||||
public class CompositeAggregationCursorTests extends AbstractSqlWireSerializingTestCase<CompositeAggregationCursor> {
|
||||
public static CompositeAggregationCursor randomCompositeCursor() {
|
||||
int extractorsSize = between(1, 20);
|
||||
ZoneId id = randomSafeZone();
|
||||
List<BucketExtractor> extractors = new ArrayList<>(extractorsSize);
|
||||
for (int i = 0; i < extractorsSize; i++) {
|
||||
extractors.add(randomBucketExtractor());
|
||||
extractors.add(randomBucketExtractor(id));
|
||||
}
|
||||
|
||||
return new CompositeAggregationCursor(new byte[randomInt(256)], extractors, randomBitSet(extractorsSize),
|
||||
randomIntBetween(10, 1024), randomBoolean(), randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
static BucketExtractor randomBucketExtractor() {
|
||||
static BucketExtractor randomBucketExtractor(ZoneId zoneId) {
|
||||
List<Supplier<BucketExtractor>> options = new ArrayList<>();
|
||||
options.add(ConstantExtractorTests::randomConstantExtractor);
|
||||
options.add(MetricAggExtractorTests::randomMetricAggExtractor);
|
||||
options.add(CompositeKeyExtractorTests::randomCompositeKeyExtractor);
|
||||
options.add(() -> MetricAggExtractorTests.randomMetricAggExtractor(zoneId));
|
||||
options.add(() -> CompositeKeyExtractorTests.randomCompositeKeyExtractor(zoneId));
|
||||
return randomFrom(options).get();
|
||||
}
|
||||
|
||||
|
@ -50,11 +49,6 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
|
|||
instance.indices());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(Cursors.getNamedWriteables());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompositeAggregationCursor createTestInstance() {
|
||||
return randomCompositeCursor();
|
||||
|
@ -66,13 +60,17 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
|
|||
}
|
||||
|
||||
@Override
|
||||
protected CompositeAggregationCursor copyInstance(CompositeAggregationCursor instance, Version version) throws IOException {
|
||||
/* Randomly choose between internal protocol round trip and String based
|
||||
* round trips used to toXContent. */
|
||||
if (randomBoolean()) {
|
||||
return super.copyInstance(instance, version);
|
||||
protected ZoneId instanceZoneId(CompositeAggregationCursor instance) {
|
||||
List<BucketExtractor> extractors = instance.extractors();
|
||||
for (BucketExtractor bucketExtractor : extractors) {
|
||||
ZoneId zoneId = MetricAggExtractorTests.extractZoneId(bucketExtractor);
|
||||
zoneId = zoneId == null ? CompositeKeyExtractorTests.extractZoneId(bucketExtractor) : zoneId;
|
||||
|
||||
if (zoneId != null) {
|
||||
return zoneId;
|
||||
}
|
||||
}
|
||||
return (CompositeAggregationCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
|
||||
return randomSafeZone();
|
||||
}
|
||||
|
||||
static BitSet randomBitSet(int size) {
|
||||
|
|
|
@ -8,10 +8,9 @@ package org.elasticsearch.xpack.sql.execution.search;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.FieldHitExtractorTests;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
|
||||
import org.elasticsearch.xpack.sql.session.Cursors;
|
||||
|
||||
|
@ -20,7 +19,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCursor> {
|
||||
public class ScrollCursorTests extends AbstractSqlWireSerializingTestCase<ScrollCursor> {
|
||||
public static ScrollCursor randomScrollCursor() {
|
||||
int extractorsSize = between(1, 20);
|
||||
List<HitExtractor> extractors = new ArrayList<>(extractorsSize);
|
||||
|
@ -37,7 +36,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
|
|||
options.add(() -> ComputingExtractorTests.randomComputingExtractor());
|
||||
}
|
||||
options.add(ConstantExtractorTests::randomConstantExtractor);
|
||||
options.add(FieldHitExtractorTests::randomFieldHitExtractor);
|
||||
return randomFrom(options).get();
|
||||
}
|
||||
|
||||
|
@ -70,6 +68,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
|
|||
if (randomBoolean()) {
|
||||
return super.copyInstance(instance, version);
|
||||
}
|
||||
return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
|
||||
return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,8 +8,8 @@ package org.elasticsearch.xpack.sql.execution.search.extractor;
|
|||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
@ -22,12 +22,16 @@ import static java.util.Collections.emptyMap;
|
|||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<CompositeKeyExtractor> {
|
||||
public class CompositeKeyExtractorTests extends AbstractSqlWireSerializingTestCase<CompositeKeyExtractor> {
|
||||
|
||||
public static CompositeKeyExtractor randomCompositeKeyExtractor() {
|
||||
return new CompositeKeyExtractor(randomAlphaOfLength(16), randomFrom(asList(Property.values())), randomSafeZone(), randomBoolean());
|
||||
}
|
||||
|
||||
public static CompositeKeyExtractor randomCompositeKeyExtractor(ZoneId zoneId) {
|
||||
return new CompositeKeyExtractor(randomAlphaOfLength(16), randomFrom(asList(Property.values())), zoneId, randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompositeKeyExtractor createTestInstance() {
|
||||
return randomCompositeKeyExtractor();
|
||||
|
@ -38,6 +42,11 @@ public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<
|
|||
return CompositeKeyExtractor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(CompositeKeyExtractor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompositeKeyExtractor mutateInstance(CompositeKeyExtractor instance) {
|
||||
return new CompositeKeyExtractor(
|
||||
|
@ -79,12 +88,7 @@ public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<
|
|||
assertEquals("Invalid date key returned: " + value, exception.getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to exclude SystemV/* time zones because they cannot be converted
|
||||
* back to DateTimeZone which we currently still need to do internally,
|
||||
* e.g. in bwc serialization and in the extract() method
|
||||
*/
|
||||
private static ZoneId randomSafeZone() {
|
||||
return randomValueOtherThanMany(zi -> zi.getId().startsWith("SystemV"), () -> randomZone());
|
||||
public static ZoneId extractZoneId(BucketExtractor extractor) {
|
||||
return extractor instanceof CompositeKeyExtractor ? ((CompositeKeyExtractor) extractor).zoneId() : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,10 +9,10 @@ import org.elasticsearch.common.document.DocumentField;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.CastProcessorTests;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessorTests;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.math.BinaryMathProcessorTests;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathFunctionProcessorTests;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor.MathOperation;
|
||||
|
@ -32,7 +32,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class ComputingExtractorTests extends AbstractWireSerializingTestCase<ComputingExtractor> {
|
||||
public class ComputingExtractorTests extends AbstractSqlWireSerializingTestCase<ComputingExtractor> {
|
||||
public static ComputingExtractor randomComputingExtractor() {
|
||||
return new ComputingExtractor(randomProcessor(), randomAlphaOfLength(10));
|
||||
}
|
||||
|
@ -41,8 +41,8 @@ public class ComputingExtractorTests extends AbstractWireSerializingTestCase<Com
|
|||
List<Supplier<Processor>> options = new ArrayList<>();
|
||||
options.add(() -> ChainingProcessorTests.randomComposeProcessor());
|
||||
options.add(CastProcessorTests::randomCastProcessor);
|
||||
options.add(DateTimeProcessorTests::randomDateTimeProcessor);
|
||||
options.add(MathFunctionProcessorTests::randomMathFunctionProcessor);
|
||||
options.add(BinaryMathProcessorTests::randomProcessor);
|
||||
return randomFrom(options).get();
|
||||
}
|
||||
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlException;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
|
@ -37,7 +37,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class FieldHitExtractorTests extends AbstractWireSerializingTestCase<FieldHitExtractor> {
|
||||
public class FieldHitExtractorTests extends AbstractSqlWireSerializingTestCase<FieldHitExtractor> {
|
||||
|
||||
public static FieldHitExtractor randomFieldHitExtractor() {
|
||||
String hitName = randomAlphaOfLength(5);
|
||||
|
@ -55,6 +55,11 @@ public class FieldHitExtractorTests extends AbstractWireSerializingTestCase<Fiel
|
|||
return FieldHitExtractor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(FieldHitExtractor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FieldHitExtractor mutateInstance(FieldHitExtractor instance) {
|
||||
return new FieldHitExtractor(
|
||||
|
|
|
@ -9,8 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlException;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
||||
|
@ -22,13 +22,17 @@ import static java.util.Collections.emptyMap;
|
|||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<MetricAggExtractor> {
|
||||
public class MetricAggExtractorTests extends AbstractSqlWireSerializingTestCase<MetricAggExtractor> {
|
||||
|
||||
public static MetricAggExtractor randomMetricAggExtractor() {
|
||||
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16),
|
||||
randomZone(), randomBoolean());
|
||||
}
|
||||
|
||||
public static MetricAggExtractor randomMetricAggExtractor(ZoneId zoneId) {
|
||||
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16), zoneId, randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MetricAggExtractor createTestInstance() {
|
||||
return randomMetricAggExtractor();
|
||||
|
@ -39,6 +43,11 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
|
|||
return MetricAggExtractor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(MetricAggExtractor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MetricAggExtractor mutateInstance(MetricAggExtractor instance) throws IOException {
|
||||
return new MetricAggExtractor(
|
||||
|
@ -113,4 +122,8 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
|
|||
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
|
||||
assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
|
||||
}
|
||||
|
||||
public static ZoneId extractZoneId(BucketExtractor extractor) {
|
||||
return extractor instanceof MetricAggExtractor ? ((MetricAggExtractor) extractor).zoneId() : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,8 +14,8 @@ import org.elasticsearch.search.aggregations.Aggregation;
|
|||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlException;
|
||||
import org.elasticsearch.xpack.sql.type.DataType;
|
||||
import org.elasticsearch.xpack.sql.util.DateUtils;
|
||||
|
@ -28,7 +28,7 @@ import static java.util.Collections.emptyMap;
|
|||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class TopHitsAggExtractorTests extends AbstractWireSerializingTestCase<TopHitsAggExtractor> {
|
||||
public class TopHitsAggExtractorTests extends AbstractSqlWireSerializingTestCase<TopHitsAggExtractor> {
|
||||
|
||||
public static TopHitsAggExtractor randomTopHitsAggExtractor() {
|
||||
return new TopHitsAggExtractor(randomAlphaOfLength(16), randomFrom(DataType.values()), randomZone());
|
||||
|
@ -44,6 +44,11 @@ public class TopHitsAggExtractorTests extends AbstractWireSerializingTestCase<To
|
|||
return TopHitsAggExtractor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(TopHitsAggExtractor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TopHitsAggExtractor mutateInstance(TopHitsAggExtractor instance) {
|
||||
return new TopHitsAggExtractor(
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
|
||||
|
||||
|
@ -17,7 +17,7 @@ import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.Da
|
|||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class DateTimeProcessorTests extends AbstractWireSerializingTestCase<DateTimeProcessor> {
|
||||
public class DateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<DateTimeProcessor> {
|
||||
|
||||
public static DateTimeProcessor randomDateTimeProcessor() {
|
||||
return new DateTimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
|
||||
|
@ -33,6 +33,11 @@ public class DateTimeProcessorTests extends AbstractWireSerializingTestCase<Date
|
|||
return DateTimeProcessor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(DateTimeProcessor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DateTimeProcessor mutateInstance(DateTimeProcessor instance) {
|
||||
DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));
|
||||
|
|
|
@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
|||
import org.elasticsearch.bootstrap.JavaVersion;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NamedDateTimeProcessor.NameExtractor;
|
||||
import org.junit.Assume;
|
||||
|
||||
|
@ -18,7 +18,7 @@ import java.time.ZoneId;
|
|||
import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class NamedDateTimeProcessorTests extends AbstractWireSerializingTestCase<NamedDateTimeProcessor> {
|
||||
public class NamedDateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<NamedDateTimeProcessor> {
|
||||
|
||||
public static NamedDateTimeProcessor randomNamedDateTimeProcessor() {
|
||||
return new NamedDateTimeProcessor(randomFrom(NameExtractor.values()), UTC);
|
||||
|
@ -40,6 +40,11 @@ public class NamedDateTimeProcessorTests extends AbstractWireSerializingTestCase
|
|||
return new NamedDateTimeProcessor(replaced, UTC);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(NamedDateTimeProcessor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
public void testValidDayNamesInUTC() {
|
||||
assumeJava9PlusAndCompatLocaleProviderSetting();
|
||||
NamedDateTimeProcessor proc = new NamedDateTimeProcessor(NameExtractor.DAY_NAME, UTC);
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NonIsoDateTimeProcessor.NonIsoDateTimeExtractor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -15,7 +15,7 @@ import java.time.ZoneId;
|
|||
import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class NonIsoDateTimeProcessorTests extends AbstractWireSerializingTestCase<NonIsoDateTimeProcessor> {
|
||||
public class NonIsoDateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<NonIsoDateTimeProcessor> {
|
||||
|
||||
|
||||
public static NonIsoDateTimeProcessor randomNonISODateTimeProcessor() {
|
||||
|
@ -38,6 +38,11 @@ public class NonIsoDateTimeProcessorTests extends AbstractWireSerializingTestCas
|
|||
return new NonIsoDateTimeProcessor(replaced, UTC);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(NonIsoDateTimeProcessor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
public void testNonISOWeekOfYearInUTC() {
|
||||
NonIsoDateTimeProcessor proc = new NonIsoDateTimeProcessor(NonIsoDateTimeExtractor.WEEK_OF_YEAR, UTC);
|
||||
assertEquals(2, proc.process(dateTime(568372930000L))); //1988-01-05T09:22:10Z[UTC]
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
|
||||
|
||||
import java.time.ZoneId;
|
||||
|
@ -14,7 +14,7 @@ import java.time.ZoneId;
|
|||
import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.time;
|
||||
import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
|
||||
|
||||
public class TimeProcessorTests extends AbstractWireSerializingTestCase<TimeProcessor> {
|
||||
public class TimeProcessorTests extends AbstractSqlWireSerializingTestCase<TimeProcessor> {
|
||||
|
||||
public static TimeProcessor randomTimeProcessor() {
|
||||
return new TimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
|
||||
|
@ -30,6 +30,11 @@ public class TimeProcessorTests extends AbstractWireSerializingTestCase<TimeProc
|
|||
return TimeProcessor::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZoneId instanceZoneId(TimeProcessor instance) {
|
||||
return instance.zoneId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TimeProcessor mutateInstance(TimeProcessor instance) {
|
||||
DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));
|
||||
|
|
|
@ -7,16 +7,15 @@ package org.elasticsearch.xpack.sql.expression.gen.processor;
|
|||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
|
||||
import org.elasticsearch.xpack.sql.expression.gen.processor.ChainingProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests.randomProcessor;
|
||||
|
||||
public class ChainingProcessorTests extends AbstractWireSerializingTestCase<ChainingProcessor> {
|
||||
public class ChainingProcessorTests extends AbstractSqlWireSerializingTestCase<ChainingProcessor> {
|
||||
public static ChainingProcessor randomComposeProcessor() {
|
||||
return new ChainingProcessor(randomProcessor(), randomProcessor());
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlException;
|
||||
import org.elasticsearch.xpack.sql.TestUtils;
|
||||
|
@ -21,6 +22,7 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo;
|
|||
import org.elasticsearch.xpack.sql.proto.Mode;
|
||||
import org.elasticsearch.xpack.sql.session.Cursor;
|
||||
import org.elasticsearch.xpack.sql.session.Cursors;
|
||||
import org.elasticsearch.xpack.sql.session.CursorsTestUtil;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -102,15 +104,14 @@ public class CursorTests extends ESTestCase {
|
|||
|
||||
public void testVersionHandling() {
|
||||
Cursor cursor = randomNonEmptyCursor();
|
||||
assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(Version.CURRENT, cursor)));
|
||||
assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
|
||||
|
||||
Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
|
||||
|
||||
String encodedWithWrongVersion = Cursors.encodeToString(nextMinorVersion, cursor);
|
||||
String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
|
||||
SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion));
|
||||
|
||||
assertEquals("Unsupported cursor version " + nextMinorVersion, exception.getMessage());
|
||||
assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
|
||||
exception.getMessage());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.sql.session;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
import java.time.ZoneId;
|
||||
|
||||
public class CursorsTestUtil {
|
||||
|
||||
public static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
|
||||
return Cursors.encodeToString(info, version, zoneId);
|
||||
}
|
||||
}
|
|
@ -9,8 +9,6 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.sql.session.Cursors;
|
||||
import org.elasticsearch.xpack.sql.session.ListCursor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -32,7 +30,7 @@ public class ListCursorTests extends AbstractWireSerializingTestCase<ListCursor>
|
|||
|
||||
@Override
|
||||
protected ListCursor mutateInstance(ListCursor instance) throws IOException {
|
||||
return new ListCursor(instance.data(),
|
||||
return new ListCursor(instance.data(),
|
||||
randomValueOtherThan(instance.pageSize(), () -> between(1, 20)),
|
||||
instance.columnCount());
|
||||
}
|
||||
|
@ -59,6 +57,6 @@ public class ListCursorTests extends AbstractWireSerializingTestCase<ListCursor>
|
|||
if (randomBoolean()) {
|
||||
return super.copyInstance(instance, version);
|
||||
}
|
||||
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
|
||||
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue