Begin migrating SQL's next page (elastic/x-pack-elasticsearch#2271)

Scrolling was only implemented for the `SqlAction` (not jdbc or cli)
and it was implemented by keeping request state on the server. On
principle we try to avoid adding extra state to elasticsearch where
possible because it creates extra points of failure and tends to
have lots of hidden complexity.

This replaces the state on the server with serializing state to the
client. This looks to the user like a "next_page" key with fairly
opaque content. It actually consists of an identifier for the *kind*
of scroll, the scroll id, and a base64 string containing the field
extractors.

Right now this only implements scrolling for `SqlAction`. The plan
is to reuse the same implementation for jdbc and cli in a followup.

This also doesn't implement all of the required serialization.
Specifically it doesn't implement serialization of
`ProcessingHitExtractor` because I haven't implemented serialization
for *any* `ColumnProcessors`.

Original commit: elastic/x-pack-elasticsearch@a8567bc5ec
This commit is contained in:
Nik Everett 2017-08-28 08:46:49 -04:00 committed by GitHub
parent 928c750585
commit 972c56dafe
99 changed files with 2720 additions and 889 deletions

View File

@ -541,6 +541,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
entries.addAll(machineLearning.getNamedWriteables());
entries.addAll(licensing.getNamedWriteables());
entries.addAll(Security.getNamedWriteables());
entries.addAll(SqlPlugin.getNamedWriteables());
return entries;
}

View File

@ -9,15 +9,13 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.util.Map;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class SqlActionIT extends AbstractSqlIntegTestCase {
public void testSqlAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
@ -27,27 +25,22 @@ public class SqlActionIT extends AbstractSqlIntegTestCase {
.get();
ensureYellow("test");
boolean columnOrder = randomBoolean();
String columns = columnOrder ? "data, count" : "count, data";
SqlResponse response = client().prepareExecute(SqlAction.INSTANCE).query("SELECT " + columns + " FROM test ORDER BY count").get();
boolean dataBeforeCount = randomBoolean();
String columns = dataBeforeCount ? "data, count" : "count, data";
SqlResponse response = client().prepareExecute(SqlAction.INSTANCE)
.query("SELECT " + columns + " FROM test ORDER BY count").get();
assertThat(response.size(), equalTo(2L));
assertThat(response.columns().keySet(), hasSize(2));
assertThat(response.columns().get("data"), equalTo("text"));
assertThat(response.columns().get("count"), equalTo("long"));
// Check that columns were returned in the requested order
assertThat(response.columns().keySet().iterator().next(), equalTo(columnOrder ? "data" : "count"));
assertThat(response.columns(), hasSize(2));
int dataIndex = dataBeforeCount ? 0 : 1;
int countIndex = dataBeforeCount ? 1 : 0;
assertEquals(new ColumnInfo("data", "text"), response.columns().get(dataIndex));
assertEquals(new ColumnInfo("count", "long"), response.columns().get(countIndex));
assertThat(response.rows(), hasSize(2));
assertThat(response.rows().get(0).get("data"), equalTo("bar"));
assertThat(response.rows().get(0).get("count"), equalTo(42L));
assertThat(response.rows().get(1).get("data"), equalTo("baz"));
assertThat(response.rows().get(1).get("count"), equalTo(43L));
// Check that columns within each row were returned in the requested order
for (Map<String, Object> row : response.rows()) {
assertThat(row.keySet().iterator().next(), equalTo(columnOrder ? "data" : "count"));
}
assertEquals("bar", response.rows().get(0).get(dataIndex));
assertEquals(42L, response.rows().get(0).get(countIndex));
assertEquals("baz", response.rows().get(1).get(dataIndex));
assertEquals(43L, response.rows().get(1).get(countIndex));
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
public class SqlMultinodeIT extends ESRestTestCase {
/**
@ -96,8 +97,8 @@ public class SqlMultinodeIT extends ESRestTestCase {
private void assertCount(RestClient client, int count) throws IOException {
Map<String, Object> expected = new HashMap<>();
expected.put("columns", singletonMap("COUNT(1)", singletonMap("type", "long")));
expected.put("rows", singletonList(singletonMap("COUNT(1)", count)));
expected.put("columns", singletonList(columnInfo("COUNT(1)", "long")));
expected.put("rows", singletonList(singletonList(count)));
expected.put("size", 1);
Map<String, Object> actual = responseToMap(client.performRequest("POST", "/_sql", emptyMap(),
@ -109,4 +110,12 @@ public class SqlMultinodeIT extends ESRestTestCase {
fail("Response does not match:\n" + message.toString());
}
}
private Map<String, Object> columnInfo(String name, String type) {
Map<String, Object> column = new HashMap<>();
column.put("name", name);
column.put("type", type);
return unmodifiableMap(column);
}
}

View File

@ -22,8 +22,11 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
@ -42,12 +45,55 @@ public class RestSqlIT extends ESRestTestCase {
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
Map<String, Object> expected = new HashMap<>();
expected.put("columns", singletonMap("test", singletonMap("type", "text")));
expected.put("rows", Arrays.asList(singletonMap("test", "test"), singletonMap("test", "test")));
expected.put("columns", singletonList(columnInfo("test", "text")));
expected.put("rows", Arrays.asList(singletonList("test"), singletonList("test")));
expected.put("size", 2);
assertResponse(expected, runSql("SELECT * FROM test"));
}
public void testNextPage() throws IOException {
StringBuilder bulk = new StringBuilder();
for (int i = 0; i < 20; i++) {
// NOCOMMIT we need number2 because we can't process the same column twice in two ways
bulk.append("{\"index\":{\"_id\":\"" + i + "\"}}\n");
bulk.append("{\"text\":\"text" + i + "\", \"number\":" + i + ", \"number2\": " + i + "}\n");
}
client().performRequest("POST", "/test/test/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
// NOCOMMIT we need tests for inner hits extractor and const extractor
String request = "{\"query\":\"SELECT text, number, SIN(number2) FROM test ORDER BY number\", \"fetch_size\":2}";
String cursor = null;
for (int i = 0; i < 20; i += 2) {
Map<String, Object> response;
if (i == 0) {
response = runSql(new StringEntity(request, ContentType.APPLICATION_JSON));
} else {
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON));
}
Map<String, Object> expected = new HashMap<>();
if (i == 0) {
expected.put("columns", Arrays.asList(
columnInfo("text", "text"),
columnInfo("number", "long"),
columnInfo("SIN(number2)", "double")));
}
expected.put("rows", Arrays.asList(
Arrays.asList("text" + i, i, Math.sin(i)),
Arrays.asList("text" + (i + 1), i + 1, Math.sin(i + 1))));
expected.put("size", 2);
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
assertNotNull(cursor);
}
Map<String, Object> expected = new HashMap<>();
expected.put("size", 0);
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON)));
}
@AwaitsFix(bugUrl="https://github.com/elastic/x-pack-elasticsearch/issues/2074")
public void testTimeZone() throws IOException {
StringBuilder bulk = new StringBuilder();
@ -65,7 +111,7 @@ public class RestSqlIT extends ESRestTestCase {
// Default TimeZone is UTC
assertResponse(expected, runSql(
new StringEntity("{\"query\":\"SELECT DAY_OF_YEAR(test), COUNT(*) FROM test.test\"}", ContentType.APPLICATION_JSON)));
new StringEntity("{\"query\":\"SELECT DAY_OF_YEAR(test), COUNT(*) FROM test\"}", ContentType.APPLICATION_JSON)));
}
public void testMissingIndex() throws IOException {
@ -119,4 +165,11 @@ public class RestSqlIT extends ESRestTestCase {
fail("Response does not match:\n" + message.toString());
}
}
private Map<String, Object> columnInfo(String name, String type) {
Map<String, Object> column = new HashMap<>();
column.put("name", name);
column.put("type", type);
return unmodifiableMap(column);
}
}

View File

@ -37,6 +37,7 @@ import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
@ -110,20 +111,13 @@ public class SqlSecurityIT extends ESRestTestCase {
public void testSqlWorksAsAdmin() throws Exception {
Map<String, Object> expected = new HashMap<>();
Map<String, Object> columns = new HashMap<>();
columns.put("a", singletonMap("type", "long"));
columns.put("b", singletonMap("type", "long"));
columns.put("c", singletonMap("type", "long"));
expected.put("columns", columns);
Map<String, Object> row1 = new HashMap<>();
row1.put("a", 1);
row1.put("b", 2);
row1.put("c", 3);
Map<String, Object> row2 = new HashMap<>();
row2.put("a", 4);
row2.put("b", 5);
row2.put("c", 6);
expected.put("rows", Arrays.asList(row1, row2));
expected.put("columns", Arrays.asList(
columnInfo("a", "long"),
columnInfo("b", "long"),
columnInfo("c", "long")));
expected.put("rows", Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6)));
expected.put("size", 2);
assertResponse(expected, runSql("SELECT * FROM test ORDER BY a", null));
assertAuditForSqlGranted("test_admin", "test");
@ -316,4 +310,12 @@ public class SqlSecurityIT extends ESRestTestCase {
throw e;
}
}
private Map<String, Object> columnInfo(String name, String type) {
Map<String, Object> column = new HashMap<>();
column.put("name", name);
column.put("type", type);
return unmodifiableMap(column);
}
}

View File

@ -7,22 +7,22 @@ package org.elasticsearch.xpack.sql.execution;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.catalog.Catalog;
import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.search.Scroller.SearchHitsActionListener;
import org.elasticsearch.xpack.sql.expression.function.DefaultFunctionRegistry;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import java.io.IOException;
import java.util.function.Supplier;
import java.util.List;
public class PlanExecutor extends AbstractLifecycleComponent {
// NOCOMMIT prefer not to use AbstractLifecycleComponent because the reasons for its tradeoffs is lost to the mists of time
@ -66,6 +66,10 @@ public class PlanExecutor extends AbstractLifecycleComponent {
session.executable(sql).execute(session, listener);
}
public void nextPage(Cursor cursor, ActionListener<RowSetCursor> listener) {
cursor.nextPage(client, listener);
}
@Override
protected void doStart() {
//no-op

View File

@ -8,7 +8,10 @@ package org.elasticsearch.xpack.sql.execution.search;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.type.Schema;
//
@ -130,4 +133,9 @@ class AggsRowSetCursor extends AbstractRowSetCursor {
public int size() {
return size;
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
}

View File

@ -5,21 +5,62 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
class ConstantExtractor implements HitExtractor {
import java.io.IOException;
import java.util.Objects;
/**
* Returns the a constant for every search hit against which it is run.
*/
class ConstantExtractor implements HitExtractor {
static final String NAME = "c";
private final Object constant;
ConstantExtractor(Object constant) {
this.constant = constant;
}
ConstantExtractor(StreamInput in) throws IOException {
constant = in.readGenericValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericValue(constant);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object get(SearchHit hit) {
return constant;
}
@Override
public String innerHitName() {
return null;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ConstantExtractor other = (ConstantExtractor) obj;
return Objects.equals(constant, other.constant);
}
@Override
public int hashCode() {
return Objects.hashCode(constant);
}
@Override
public String toString() {
return "^" + constant;

View File

@ -6,23 +6,68 @@
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
/**
* Extracts field values from {@link SearchHit#field(String)}.
*/
class DocValueExtractor implements HitExtractor {
static final String NAME = "f";
private final String fieldName;
DocValueExtractor(String name) {
this.fieldName = name;
}
DocValueExtractor(StreamInput in) throws IOException {
fieldName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(fieldName);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object get(SearchHit hit) {
// NOCOMMIT we should think about what to do with multi-valued fields.
DocumentField field = hit.field(fieldName);
return field != null ? field.getValue() : null;
}
@Override
public String innerHitName() {
return null;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
DocValueExtractor other = (DocValueExtractor) obj;
return fieldName.equals(other.fieldName);
}
@Override
public int hashCode() {
return fieldName.hashCode();
}
@Override
public String toString() {
return fieldName;
/* % kind of looks like two 0s with a column separator between
* them so it makes me think of columnar storage which doc
* values are. */
return "%" + fieldName;
}
}

View File

@ -5,8 +5,43 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
interface HitExtractor {
import java.util.ArrayList;
import java.util.List;
/**
* Extracts a columns value from a {@link SearchHit}.
*/
public interface HitExtractor extends NamedWriteable {
/**
* All of the named writeables needed to deserialize the instances
* of {@linkplain HitExtractor}.
*/
static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.add(new Entry(HitExtractor.class, ConstantExtractor.NAME, ConstantExtractor::new));
entries.add(new Entry(HitExtractor.class, DocValueExtractor.NAME, DocValueExtractor::new));
entries.add(new Entry(HitExtractor.class, InnerHitExtractor.NAME, InnerHitExtractor::new));
entries.add(new Entry(HitExtractor.class, SourceExtractor.NAME, SourceExtractor::new));
entries.add(new Entry(HitExtractor.class, ProcessingHitExtractor.NAME, ProcessingHitExtractor::new));
entries.addAll(ColumnProcessor.getNamedWriteables());
return entries;
}
/**
* Extract the value from a hit.
*/
Object get(SearchHit hit);
/**
* Name of the inner hit needed by this extractor if it needs one, {@code null} otherwise.
*/
@Nullable
String innerHitName();
}

View File

@ -7,12 +7,17 @@ package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.execution.ExecutionException;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
class InnerHitExtractor implements HitExtractor {
static final String NAME = "i";
private final String hitName, fieldName;
private final boolean useDocValue;
private final String[] tree;
@ -24,6 +29,25 @@ class InnerHitExtractor implements HitExtractor {
this.tree = useDocValue ? Strings.EMPTY_ARRAY : Strings.tokenizeToStringArray(name, ".");
}
InnerHitExtractor(StreamInput in) throws IOException {
hitName = in.readString();
fieldName = in.readString();
useDocValue = in.readBoolean();
tree = useDocValue ? Strings.EMPTY_ARRAY : Strings.tokenizeToStringArray(fieldName, ".");
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(hitName);
out.writeString(fieldName);
out.writeBoolean(useDocValue);
}
@SuppressWarnings("unchecked")
@Override
public Object get(SearchHit hit) {
@ -52,7 +76,16 @@ class InnerHitExtractor implements HitExtractor {
}
}
public String parent() {
@Override
public String innerHitName() {
return hitName;
}
String fieldName() {
return fieldName;
}
public String hitName() {
return hitName;
}
@ -60,4 +93,20 @@ class InnerHitExtractor implements HitExtractor {
public String toString() {
return fieldName + "@" + hitName;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
InnerHitExtractor other = (InnerHitExtractor) obj;
return fieldName.equals(other.fieldName)
&& hitName.equals(other.hitName)
&& useDocValue == other.useDocValue;
}
@Override
public int hashCode() {
return Objects.hash(hitName, fieldName, useDocValue);
}
}

View File

@ -5,12 +5,17 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
class ProcessingHitExtractor implements HitExtractor {
import java.io.IOException;
import java.util.Objects;
final HitExtractor delegate;
class ProcessingHitExtractor implements HitExtractor {
static final String NAME = "p";
private final HitExtractor delegate;
private final ColumnProcessor processor;
ProcessingHitExtractor(HitExtractor delegate, ColumnProcessor processor) {
@ -18,8 +23,57 @@ class ProcessingHitExtractor implements HitExtractor {
this.processor = processor;
}
ProcessingHitExtractor(StreamInput in) throws IOException {
delegate = in.readNamedWriteable(HitExtractor.class);
processor = in.readNamedWriteable(ColumnProcessor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(delegate);
out.writeNamedWriteable(processor);
}
@Override
public String getWriteableName() {
return NAME;
}
HitExtractor delegate() {
return delegate;
}
ColumnProcessor processor() {
return processor;
}
@Override
public Object get(SearchHit hit) {
return processor.apply(delegate.get(hit));
}
@Override
public String innerHitName() {
return delegate.innerHitName();
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ProcessingHitExtractor other = (ProcessingHitExtractor) obj;
return delegate.equals(other.delegate)
&& processor.equals(other.processor);
}
@Override
public int hashCode() {
return Objects.hash(delegate, processor);
}
@Override
public String toString() {
return processor + "(" + delegate + ")";
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.Schema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class ScrollCursor implements Cursor {
public static final String NAME = "s";
/**
* {@link NamedWriteableRegistry} used to resolve the {@link #extractors}.
*/
private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(HitExtractor.getNamedWriteables());
private final String scrollId;
private final List<HitExtractor> extractors;
public ScrollCursor(String scrollId, List<HitExtractor> extractors) {
this.scrollId = scrollId;
this.extractors = extractors;
}
public ScrollCursor(StreamInput in) throws IOException {
scrollId = in.readString();
extractors = in.readNamedWriteableList(HitExtractor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(scrollId);
out.writeNamedWriteableList(extractors);
}
public ScrollCursor(java.io.Reader reader) throws IOException {
StringBuffer scrollId = new StringBuffer();
int c;
while ((c = reader.read()) != -1 && c != ':') {
scrollId.append((char) c);
}
this.scrollId = scrollId.toString();
if (c == -1) {
throw new IllegalArgumentException("invalid cursor");
}
try (StreamInput delegate = new InputStreamStreamInput(Base64.getDecoder().wrap(new InputStream() {
@Override
public int read() throws IOException {
int c = reader.read();
if (c < -1 || c > 0xffff) {
throw new IllegalArgumentException("invalid cursor [" + Integer.toHexString(c) + "]");
}
return c;
}
})); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) {
extractors = in.readNamedWriteableList(HitExtractor.class);
}
}
@Override
public void writeTo(java.io.Writer writer) throws IOException {
writer.write(scrollId);
writer.write(':');
try (StreamOutput out = new OutputStreamStreamOutput(Base64.getEncoder().wrap(new OutputStream() {
@Override
public void write(int b) throws IOException {
writer.write(b);
}
}))) {
out.writeNamedWriteableList(extractors);
}
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void nextPage(Client client, ActionListener<RowSetCursor> listener) {
// Fake the schema for now. We'll try to remove the need later.
List<String> names = new ArrayList<>(extractors.size());
List<DataType> dataTypes = new ArrayList<>(extractors.size());
for (int i = 0; i < extractors.size(); i++) {
names.add("dummy");
dataTypes.add(null);
}
// NOCOMMIT make schema properly nullable for the second page
Schema schema = new Schema(names, dataTypes);
// NOCOMMIT add keep alive to the settings and pass it here
/* Or something. The trouble is that settings is for *starting*
* queries, but maybe we should actually have two sets of settings,
* one for things that are only valid when going to the next page
* and one that is valid for starting queries.
*/
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90));
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
int limitHits = -1; // NOCOMMIT do a thing with this
listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(),
limitHits, response.getScrollId(), null));
}, listener::onFailure));
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ScrollCursor other = (ScrollCursor) obj;
return Objects.equals(scrollId, other.scrollId)
&& Objects.equals(extractors, other.extractors);
}
@Override
public int hashCode() {
return Objects.hash(scrollId, extractors);
}
@Override
public String toString() {
return "cursor for scoll [" + scrollId + "]";
}
}

View File

@ -53,7 +53,7 @@ public class Scroller {
private final Client client;
public Scroller(Client client, SqlSettings settings) {
// TODO: use better defaults (maybe use the sql settings)?
// NOCOMMIT the scroll time should be available in the request somehow. Rest is going to fail badly unless they set it.
this(client, TimeValue.timeValueSeconds(90), TimeValue.timeValueSeconds(45), settings.pageSize());
}
@ -253,7 +253,7 @@ public class Scroller {
}
}
abstract static class SearchHitsActionListener extends ScrollerActionListener {
public abstract static class SearchHitsActionListener extends ScrollerActionListener {
final int limit;
int docsRead;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.type.Schema;
@ -25,7 +26,6 @@ import java.util.function.Consumer;
// and eventually carries that over to the top level
public class SearchHitRowSetCursor extends AbstractRowSetCursor {
private final SearchHit[] hits;
private final String scrollId;
private final List<HitExtractor> extractors;
@ -46,12 +46,11 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
this.scrollId = scrollId;
this.extractors = exts;
String innerH = null;
String innerHit = null;
for (HitExtractor ex : exts) {
InnerHitExtractor ie = getInnerHitExtractor(ex);
if (ie != null) {
innerH = ie.parent();
innerHits.add(innerH);
innerHit = ex.innerHitName();
if (innerHit != null) {
innerHits.add(innerHit);
}
}
@ -77,13 +76,13 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
}
size = limitHits < 0 ? sz : Math.min(sz, limitHits);
indexPerLevel = new int[maxDepth + 1];
innerHit = innerH;
this.innerHit = innerHit;
}
@Override
protected Object getColumn(int column) {
HitExtractor e = extractors.get(column);
int extractorLevel = isInnerHitExtractor(e) ? 1 : 0;
int extractorLevel = e.innerHitName() == null ? 0 : 1;
SearchHit hit = null;
SearchHit[] sh = hits;
@ -99,20 +98,6 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
return e.get(hit);
}
private boolean isInnerHitExtractor(HitExtractor he) {
return getInnerHitExtractor(he) != null;
}
private InnerHitExtractor getInnerHitExtractor(HitExtractor he) {
if (he instanceof ProcessingHitExtractor) {
return getInnerHitExtractor(((ProcessingHitExtractor) he).delegate);
}
if (he instanceof InnerHitExtractor) {
return (InnerHitExtractor) he;
}
return null;
}
@Override
protected boolean doHasCurrent() {
return row < size();
@ -166,4 +151,18 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
public String scrollId() {
return scrollId;
}
@Override
public Cursor nextPageCursor() {
if (scrollId == null) {
/* SearchResponse can contain a null scroll when you start a
* scroll but all results fit in the first page. */
return Cursor.EMPTY;
}
if (hits.length == 0) {
// NOCOMMIT handle limit
return Cursor.EMPTY;
}
return new ScrollCursor(scrollId, extractors);
}
}

View File

@ -5,25 +5,66 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import java.io.IOException;
import java.util.Map;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
class SourceExtractor implements HitExtractor {
public static final String NAME = "s";
private final String fieldName;
SourceExtractor(String name) {
this.fieldName = name;
}
SourceExtractor(StreamInput in) throws IOException {
fieldName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(fieldName);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object get(SearchHit hit) {
Map<String, Object> source = hit.getSourceAsMap();
// NOCOMMIT I think this will not work with dotted field names (objects or actual dots in the names)
// confusingly, I think this is actually handled by InnerHitExtractor. This needs investigating or renaming
return source != null ? source.get(fieldName) : null;
}
@Override
public String innerHitName() {
return null;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
SourceExtractor other = (SourceExtractor) obj;
return fieldName.equals(other.fieldName);
}
@Override
public int hashCode() {
return fieldName.hashCode();
}
@Override
public String toString() {
return fieldName;
/* # is sometimes known as the "hash" sign which reminds
* me of a hash table lookup. */
return "#" + fieldName;
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypeConvertion;
import org.elasticsearch.xpack.sql.type.DataTypeConversion;
public abstract class BinaryOperator extends BinaryExpression {
@ -26,7 +26,7 @@ public abstract class BinaryOperator extends BinaryExpression {
if (!l.same(r)) {
return new TypeResolution("Different types (%s and %s) used in '%s'", l.sqlName(), r.sqlName(), symbol());
}
if (!DataTypeConvertion.canConvert(accepted, left().dataType())) {
if (!DataTypeConversion.canConvert(accepted, left().dataType())) {
return new TypeResolution("'%s' requires type %s not %s", symbol(), accepted.sqlName(), l.sqlName());
}
else {

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypeConvertion;
import org.elasticsearch.xpack.sql.type.DataTypeConversion;
import org.elasticsearch.xpack.sql.type.DataTypes;
import java.util.ArrayList;
@ -15,9 +15,10 @@ import java.util.List;
public abstract class Foldables {
@SuppressWarnings("unchecked")
public static <T> T valueOf(Expression e, DataType to) {
if (e.foldable()) {
return DataTypeConvertion.convert(e.fold(), e.dataType(), to);
return (T) DataTypeConversion.conversionFor(e.dataType(), to).convert(e.fold());
}
throw new SqlIllegalArgumentException("Cannot determine value for %s", e);
}
@ -46,14 +47,9 @@ public abstract class Foldables {
}
public static <T> List<T> valuesOf(List<Expression> list, DataType to) {
List<T> l = new ArrayList<>();
List<T> l = new ArrayList<>(list.size());
for (Expression e : list) {
if (e.foldable()) {
l.add(DataTypeConvertion.convert(e.fold(), e.dataType(), to));
}
else {
throw new SqlIllegalArgumentException("Cannot determine value for %s", e);
}
l.add(valueOf(e, to));
}
return l;
}
@ -61,4 +57,4 @@ public abstract class Foldables {
public static List<Double> doubleValuesOf(List<Expression> list) {
return valuesOf(list, DataTypes.DOUBLE);
}
}
}

View File

@ -5,16 +5,17 @@
*/
package org.elasticsearch.xpack.sql.expression.function;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Alias;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.NamedExpression;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ComposeProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction;
import java.util.ArrayList;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -86,7 +87,7 @@ public abstract class Functions {
if (e instanceof ScalarFunction) {
ScalarFunction sf = (ScalarFunction) e;
// A(B(C)) is applied backwards first C then B then A, the last function first
proc = sf.asProcessor().andThen(proc);
proc = proc == null ? sf.asProcessor() : new ComposeProcessor(sf.asProcessor(), proc);
}
else {
return proc;

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import java.util.Objects;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute;
@ -14,7 +12,9 @@ import org.elasticsearch.xpack.sql.expression.function.scalar.script.Params;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypeConvertion;
import org.elasticsearch.xpack.sql.type.DataTypeConversion;
import java.util.Objects;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ParamsBuilder.paramsBuilder;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate.formatTemplate;
@ -43,12 +43,12 @@ public class Cast extends ScalarFunction {
@Override
public boolean nullable() {
return argument().nullable() || DataTypeConvertion.nullable(from(), to());
return argument().nullable() || DataTypeConversion.nullable(from(), to());
}
@Override
protected TypeResolution resolveType() {
return DataTypeConvertion.canConvert(from(), to()) ?
return DataTypeConversion.canConvert(from(), to()) ?
TypeResolution.TYPE_RESOLVED :
new TypeResolution("Cannot cast %s to %s", from(), to());
}
@ -78,7 +78,7 @@ public class Cast extends ScalarFunction {
@Override
public ColumnProcessor asProcessor() {
return c -> DataTypeConvertion.convert(c, from(), to());
return new CastProcessor(DataTypeConversion.conversionFor(from(), to()));
}
@Override

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.type.DataTypeConversion.Conversion;
import java.io.IOException;
public class CastProcessor implements ColumnProcessor {
public static final String NAME = "c";
private final Conversion conversion;
CastProcessor(Conversion conversion) {
this.conversion = conversion;
}
CastProcessor(StreamInput in) throws IOException {
conversion = in.readEnum(Conversion.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(conversion);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object apply(Object r) {
return conversion.convert(r);
}
Conversion converter() {
return conversion;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
CastProcessor other = (CastProcessor) obj;
return conversion.equals(other.conversion);
}
@Override
public int hashCode() {
return conversion.hashCode();
}
@Override
public String toString() {
return conversion.toString();
}
}

View File

@ -5,12 +5,28 @@
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
@FunctionalInterface
public interface ColumnProcessor {
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import java.util.ArrayList;
import java.util.List;
public interface ColumnProcessor extends NamedWriteable {
/**
* All of the named writeables needed to deserialize the instances
* of {@linkplain ColumnProcessor}.
*/
static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.add(new NamedWriteableRegistry.Entry(ColumnProcessor.class, CastProcessor.NAME, CastProcessor::new));
entries.add(new NamedWriteableRegistry.Entry(ColumnProcessor.class, ComposeProcessor.NAME, ComposeProcessor::new));
entries.add(new NamedWriteableRegistry.Entry(ColumnProcessor.class, DateTimeProcessor.NAME, DateTimeProcessor::new));
entries.add(new NamedWriteableRegistry.Entry(ColumnProcessor.class,
MathFunctionProcessor.NAME, MathFunctionProcessor::new));
entries.add(new NamedWriteableRegistry.Entry(ColumnProcessor.class,
MatrixFieldProcessor.NAME, MatrixFieldProcessor::new));
return entries;
}
Object apply(Object r);
default ColumnProcessor andThen(ColumnProcessor after) {
return after != null ? r -> after.apply(apply(r)) : this;
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
/**
* A {@linkplain ColumnProcessor} that composes the results of two
* {@linkplain ColumnProcessor}s.
*/
public class ComposeProcessor implements ColumnProcessor {
static final String NAME = ".";
private final ColumnProcessor first;
private final ColumnProcessor second;
public ComposeProcessor(ColumnProcessor first, ColumnProcessor second) {
this.first = first;
this.second = second;
}
public ComposeProcessor(StreamInput in) throws IOException {
first = in.readNamedWriteable(ColumnProcessor.class);
second = in.readNamedWriteable(ColumnProcessor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(first);
out.writeNamedWriteable(second);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object apply(Object r) {
return second.apply(first.apply(r));
}
ColumnProcessor first() {
return first;
}
ColumnProcessor second() {
return second;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ComposeProcessor other = (ComposeProcessor) obj;
return first.equals(other.first)
&& second.equals(other.second);
}
@Override
public int hashCode() {
return Objects.hash(first, second);
}
@Override
public String toString() {
// borrow Haskell's notation for function comosition
return "(" + second + " . " + first + ")";
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeExtractor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.io.IOException;
public class DateTimeProcessor implements ColumnProcessor {
public static final String NAME = "d";
private final DateTimeExtractor extractor;
public DateTimeProcessor(DateTimeExtractor extractor) {
this.extractor = extractor;
}
DateTimeProcessor(StreamInput in) throws IOException {
extractor = in.readEnum(DateTimeExtractor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(extractor);
}
@Override
public String getWriteableName() {
return NAME;
}
DateTimeExtractor extractor() {
return extractor;
}
@Override
public Object apply(Object l) {
ReadableDateTime dt = null;
// most dates are returned as long
if (l instanceof Long) {
dt = new DateTime((Long) l, DateTimeZone.UTC);
}
else {
dt = (ReadableDateTime) l;
}
return extractor.extract(dt);
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
DateTimeProcessor other = (DateTimeProcessor) obj;
return extractor == other.extractor;
}
@Override
public int hashCode() {
return extractor.hashCode();
}
@Override
public String toString() {
return extractor.toString();
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
import java.io.IOException;
public class MathFunctionProcessor implements ColumnProcessor {
public static final String NAME = "m";
private final MathProcessor processor;
public MathFunctionProcessor(MathProcessor processor) {
this.processor = processor;
}
MathFunctionProcessor(StreamInput in) throws IOException {
processor = in.readEnum(MathProcessor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(processor);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Object apply(Object r) {
return processor.apply(r);
}
MathProcessor processor() {
return processor;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
MathFunctionProcessor other = (MathFunctionProcessor) obj;
return processor == other.processor;
}
@Override
public int hashCode() {
return processor.hashCode();
}
@Override
public String toString() {
return processor.toString();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Map;
public class MatrixFieldProcessor implements ColumnProcessor {
public static final String NAME = "mat";
private final String key;
public MatrixFieldProcessor(String key) {
this.key = key;
}
MatrixFieldProcessor(StreamInput in) throws IOException {
key = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(key);
}
@Override
public String getWriteableName() {
return NAME;
}
String key() {
return key;
}
@Override
public Object apply(Object r) {
return r instanceof Map ? ((Map<?, ?>) r).get(key) : r;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
MatrixFieldProcessor other = (MatrixFieldProcessor) obj;
return key.equals(other.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
public String toString() {
return "[" + key + "]";
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.joda.time.DateTimeFieldType;
import org.joda.time.ReadableDateTime;
/**
* Extracts portions of {@link ReadableDateTime}s. Note that the position in the enum is used for serialization.
*/
public enum DateTimeExtractor {
DAY_OF_MONTH(DateTimeFieldType.dayOfMonth()),
DAY_OF_WEEK(DateTimeFieldType.dayOfWeek()),
DAY_OF_YEAR(DateTimeFieldType.dayOfYear()),
HOUR_OF_DAY(DateTimeFieldType.hourOfDay()),
MINUTE_OF_DAY(DateTimeFieldType.minuteOfDay()),
MINUTE_OF_HOUR(DateTimeFieldType.minuteOfHour()),
MONTH_OF_YEAR(DateTimeFieldType.monthOfYear()),
SECOND_OF_MINUTE(DateTimeFieldType.secondOfMinute()),
WEEK_OF_YEAR(DateTimeFieldType.weekOfWeekyear()),
YEAR(DateTimeFieldType.year());
private final DateTimeFieldType field;
DateTimeExtractor(DateTimeFieldType field) {
this.field = field;
}
public int extract(ReadableDateTime dt) {
return dt.get(field);
}
}

View File

@ -11,14 +11,13 @@ import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute;
import org.elasticsearch.xpack.sql.expression.function.aware.TimeZoneAware;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.DateTimeProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
import java.util.Locale;
@ -86,27 +85,17 @@ public abstract class DateTimeFunction extends ScalarFunction implements TimeZon
}
@Override
public ColumnProcessor asProcessor() {
return l -> {
ReadableDateTime dt = null;
// most dates are returned as long
if (l instanceof Long) {
dt = new DateTime((Long) l, DateTimeZone.UTC);
}
else {
dt = (ReadableDateTime) l;
}
return Integer.valueOf(extract(dt));
};
public final ColumnProcessor asProcessor() {
return new DateTimeProcessor(extractor());
}
protected abstract DateTimeExtractor extractor();
@Override
public DataType dataType() {
return DataTypes.INTEGER;
}
protected abstract int extract(ReadableDateTime dt);
// used for aggregration (date histogram)
public abstract String interval();

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class DayOfMonth extends DateTimeFunction {
public DayOfMonth(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class DayOfMonth extends DateTimeFunction {
return "day";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getDayOfMonth();
}
@Override
protected ChronoField chronoField() {
return ChronoField.DAY_OF_MONTH;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.DAY_OF_MONTH;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class DayOfWeek extends DateTimeFunction {
public DayOfWeek(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class DayOfWeek extends DateTimeFunction {
return "day";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getDayOfWeek();
}
@Override
protected ChronoField chronoField() {
return ChronoField.DAY_OF_WEEK;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.DAY_OF_WEEK;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class DayOfYear extends DateTimeFunction {
public DayOfYear(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class DayOfYear extends DateTimeFunction {
return "day";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getDayOfYear();
}
@Override
protected ChronoField chronoField() {
return ChronoField.DAY_OF_YEAR;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.DAY_OF_YEAR;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class HourOfDay extends DateTimeFunction {
public HourOfDay(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class HourOfDay extends DateTimeFunction {
return "hour";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getHourOfDay();
}
@Override
protected ChronoField chronoField() {
return ChronoField.HOUR_OF_DAY;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.HOUR_OF_DAY;
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
@ -28,13 +27,13 @@ public class MinuteOfDay extends DateTimeFunction {
return "minute";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getMinuteOfDay();
}
@Override
protected ChronoField chronoField() {
return ChronoField.MINUTE_OF_DAY;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.MINUTE_OF_DAY;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class MinuteOfHour extends DateTimeFunction {
public MinuteOfHour(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class MinuteOfHour extends DateTimeFunction {
return "minute";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getMinuteOfHour();
}
@Override
protected ChronoField chronoField() {
return ChronoField.MINUTE_OF_HOUR;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.MINUTE_OF_HOUR;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class MonthOfYear extends DateTimeFunction {
public MonthOfYear(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class MonthOfYear extends DateTimeFunction {
return "month";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getMonthOfYear();
}
@Override
protected ChronoField chronoField() {
return ChronoField.MONTH_OF_YEAR;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.MONTH_OF_YEAR;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class SecondOfMinute extends DateTimeFunction {
public SecondOfMinute(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class SecondOfMinute extends DateTimeFunction {
return "second";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getSecondOfMinute();
}
@Override
protected ChronoField chronoField() {
return ChronoField.SECOND_OF_MINUTE;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.SECOND_OF_MINUTE;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class WeekOfWeekYear extends DateTimeFunction {
public WeekOfWeekYear(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,13 +26,13 @@ public class WeekOfWeekYear extends DateTimeFunction {
return "week";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getWeekOfWeekyear();
}
@Override
protected ChronoField chronoField() {
return ChronoField.ALIGNED_WEEK_OF_YEAR; // NOCOMMIT is this right?
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.WEEK_OF_YEAR;
}
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.time.temporal.ChronoField;
public class Year extends DateTimeFunction {
public Year(Location location, Expression argument, DateTimeZone timeZone) {
super(location, argument, timeZone);
}
@ -28,11 +26,6 @@ public class Year extends DateTimeFunction {
return "year";
}
@Override
protected int extract(ReadableDateTime dt) {
return dt.getYear();
}
@Override
public Expression orderBy() {
return argument();
@ -42,4 +35,9 @@ public class Year extends DateTimeFunction {
protected ChronoField chronoField() {
return ChronoField.YEAR;
}
@Override
protected DateTimeExtractor extractor() {
return DateTimeExtractor.YEAR;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class ACos extends MathFunction {
public ACos(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.acos(d);
protected MathProcessor processor() {
return MathProcessor.ACOS;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class ASin extends MathFunction {
public ASin(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.asin(d);
protected MathProcessor processor() {
return MathProcessor.ASIN;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class ATan extends MathFunction {
public ATan(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.atan(d);
protected MathProcessor processor() {
return MathProcessor.ATAN;
}
}

View File

@ -6,37 +6,21 @@
package org.elasticsearch.xpack.sql.expression.function.scalar.math;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataType;
public class Abs extends MathFunction {
public Abs(Location location, Expression argument) {
super(location, argument);
}
@Override
public ColumnProcessor asProcessor() {
return l -> {
if (l instanceof Float) {
return Math.abs(((Float) l).floatValue());
}
if (l instanceof Double) {
return Math.abs(((Double) l).doubleValue());
}
long lo = ((Number) l).longValue();
return lo >= 0 ? lo : lo == Long.MIN_VALUE ? Long.MAX_VALUE : -lo;
};
protected MathProcessor processor() {
return MathProcessor.ABS;
}
@Override
public DataType dataType() {
return argument().dataType();
}
@Override
protected Object math(double d) {
throw new UnsupportedOperationException("unused");
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Cbrt extends MathFunction {
public Cbrt(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.cbrt(d);
protected MathProcessor processor() {
return MathProcessor.CBRT;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Ceil extends MathFunction {
public Ceil(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.ceil(d);
protected MathProcessor processor() {
return MathProcessor.CEIL;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Cos extends MathFunction {
public Cos(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.cos(d);
protected MathProcessor processor() {
return MathProcessor.COS;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Cosh extends MathFunction {
public Cosh(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.cosh(d);
protected MathProcessor processor() {
return MathProcessor.COSH;
}
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Degrees extends MathFunction {
public Degrees(Location location, Expression argument) {
super(location, argument);
}
@ -20,7 +19,7 @@ public class Degrees extends MathFunction {
}
@Override
protected Double math(double d) {
return Math.toDegrees(d);
protected MathProcessor processor() {
return MathProcessor.DEGREES;
}
}

View File

@ -6,14 +6,11 @@
package org.elasticsearch.xpack.sql.expression.function.scalar.math;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.util.StringUtils;
public class E extends MathFunction {
public E(Location location) {
super(location);
}
@ -27,18 +24,13 @@ public class E extends MathFunction {
return Math.E;
}
@Override
public ColumnProcessor asProcessor() {
return l -> Math.E;
}
@Override
protected Object math(double d) {
throw new SqlIllegalArgumentException("unused");
}
@Override
protected ScriptTemplate asScript() {
return new ScriptTemplate(StringUtils.EMPTY);
}
@Override
protected MathProcessor processor() {
return MathProcessor.E;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Exp extends MathFunction {
public Exp(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.exp(d);
protected MathProcessor processor() {
return MathProcessor.EXP;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Expm1 extends MathFunction {
public Expm1(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.expm1(d);
protected MathProcessor processor() {
return MathProcessor.EXPM1;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Floor extends MathFunction {
public Floor(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.floor(d);
protected MathProcessor processor() {
return MathProcessor.FLOOR;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Log extends MathFunction {
public Log(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.log(d);
protected MathProcessor processor() {
return MathProcessor.LOG;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Log10 extends MathFunction {
public Log10(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.log10(d);
protected MathProcessor processor() {
return MathProcessor.LOG10;
}
}

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.xpack.sql.expression.function.scalar.math;
import java.util.Locale;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.MathFunctionProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunctionAttribute;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate;
@ -18,8 +17,9 @@ import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypes;
import static java.lang.String.format;
import java.util.Locale;
import static java.lang.String.format;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ParamsBuilder.paramsBuilder;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate.formatTemplate;
@ -79,12 +79,9 @@ public abstract class MathFunction extends ScalarFunction {
}
@Override
public ColumnProcessor asProcessor() {
return l -> {
double d = ((Number) l).doubleValue();
return math(d);
};
public final ColumnProcessor asProcessor() {
return new MathFunctionProcessor(processor());
}
protected abstract Object math(double d);
protected abstract MathProcessor processor();
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar.math;
import java.util.function.DoubleFunction;
import java.util.function.Function;
/**
* Applies a math function. Note that the order of the enum constants is used for serialization.
*/
public enum MathProcessor {
ABS((Object l) -> {
if (l instanceof Float) {
return Math.abs(((Float) l).floatValue());
}
if (l instanceof Double) {
return Math.abs(((Double) l).doubleValue());
}
long lo = ((Number) l).longValue();
return lo >= 0 ? lo : lo == Long.MIN_VALUE ? Long.MAX_VALUE : -lo;
}),
ACOS(fromDouble(Math::acos)),
ASIN(fromDouble(Math::asin)),
ATAN(fromDouble(Math::atan)),
CBRT(fromDouble(Math::cbrt)),
CEIL(fromDouble(Math::ceil)),
COS(fromDouble(Math::cos)),
COSH(fromDouble(Math::cosh)),
DEGREES(fromDouble(Math::toDegrees)),
E((Object l) -> Math.E),
EXP(fromDouble(Math::exp)),
EXPM1(fromDouble(Math::expm1)),
FLOOR(fromDouble(Math::floor)),
LOG(fromDouble(Math::log)),
LOG10(fromDouble(Math::log10)),
PI((Object l) -> Math.PI),
RADIANS(fromDouble(Math::toRadians)),
ROUND(fromDouble(Math::round)),
SIN(fromDouble(Math::sin)),
SINH(fromDouble(Math::sinh)),
SQRT(fromDouble(Math::sqrt)),
TAN(fromDouble(Math::tan));
private final Function<Object, Object> apply;
MathProcessor(Function<Object, Object> apply) {
this.apply = apply;
}
private static Function<Object, Object> fromDouble(DoubleFunction<Object> apply) {
return (Object l) -> apply.apply(((Number) l).doubleValue());
}
public final Object apply(Object l) {
return apply.apply(l);
}
}

View File

@ -6,14 +6,11 @@
package org.elasticsearch.xpack.sql.expression.function.scalar.math;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.util.StringUtils;
public class Pi extends MathFunction {
public Pi(Location location) {
super(location);
}
@ -27,18 +24,13 @@ public class Pi extends MathFunction {
return Math.PI;
}
@Override
public ColumnProcessor asProcessor() {
return l -> Math.PI;
}
@Override
protected Object math(double d) {
throw new SqlIllegalArgumentException("unused");
}
@Override
protected ScriptTemplate asScript() {
return new ScriptTemplate(StringUtils.EMPTY);
}
@Override
protected MathProcessor processor() {
return MathProcessor.PI;
}
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Radians extends MathFunction {
public Radians(Location location, Expression argument) {
super(location, argument);
}
@ -20,7 +19,7 @@ public class Radians extends MathFunction {
}
@Override
protected Double math(double d) {
return Math.toRadians(d);
protected MathProcessor processor() {
return MathProcessor.RADIANS;
}
}

View File

@ -11,18 +11,17 @@ import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypes;
public class Round extends MathFunction {
public Round(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Long math(double d) {
return Long.valueOf(Math.round(d));
}
@Override
public DataType dataType() {
return DataTypes.LONG;
}
@Override
protected MathProcessor processor() {
return MathProcessor.ROUND;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Sin extends MathFunction {
public Sin(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.sin(d);
protected MathProcessor processor() {
return MathProcessor.SIN;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Sinh extends MathFunction {
public Sinh(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.sinh(d);
protected MathProcessor processor() {
return MathProcessor.SINH;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Sqrt extends MathFunction {
public Sqrt(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.sqrt(d);
protected MathProcessor processor() {
return MathProcessor.SQRT;
}
}

View File

@ -9,13 +9,12 @@ import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
public class Tan extends MathFunction {
public Tan(Location location, Expression argument) {
super(location, argument);
}
@Override
protected Double math(double d) {
return Math.tan(d);
protected MathProcessor processor() {
return MathProcessor.TAN;
}
}

View File

@ -22,6 +22,8 @@ import org.elasticsearch.xpack.sql.expression.function.aggregate.CompoundNumeric
import org.elasticsearch.xpack.sql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.sql.expression.function.aggregate.InnerAggregate;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ComposeProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.MatrixFieldProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunctionAttribute;
import org.elasticsearch.xpack.sql.plan.physical.AggregateExec;
@ -370,7 +372,7 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
String aggPath = AggPath.metricValue(cAggPath, ia.innerId());
// FIXME: concern leak - hack around MatrixAgg which is not generalized (afaik)
if (ia.innerKey() != null) {
proc = QueryTranslator.matrixFieldExtractor(ia.innerKey()).andThen(proc);
proc = new ComposeProcessor(new MatrixFieldProcessor(QueryTranslator.nameOf(ia.innerKey())), proc);
}
return queryC.addAggRef(aggPath, proc);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.xpack.sql.expression.function.aggregate.PercentileRanks
import org.elasticsearch.xpack.sql.expression.function.aggregate.Percentiles;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Stats;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Sum;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunctionAttribute;
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeFunction;
import org.elasticsearch.xpack.sql.expression.function.scalar.script.Params;
@ -391,12 +390,6 @@ abstract class QueryTranslator {
throw new SqlIllegalArgumentException("Cannot determine id for %s", e);
}
@SuppressWarnings("rawtypes")
static ColumnProcessor matrixFieldExtractor(Expression exp) {
String key = nameOf(exp);
return r -> r instanceof Map ? ((Map) r).get(key) : r;
}
static String dateFormat(Expression e) {
if (e instanceof DateTimeFunction) {
return ((DateTimeFunction) e).dateTimeFormat();

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
@ -32,6 +33,7 @@ import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.rest.RestSqlAction;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.Arrays;
import java.util.Collection;
@ -39,6 +41,9 @@ import java.util.List;
import java.util.function.Supplier;
public class SqlPlugin implements ActionPlugin {
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Cursor.getNamedWriteables();
}
private final SqlLicenseChecker sqlLicenseChecker;

View File

@ -8,9 +8,12 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -19,30 +22,36 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SqlRequest extends ActionRequest implements CompositeIndicesRequest {
public static final ParseField CURSOR = new ParseField("cursor");
public static final ObjectParser<SqlRequest, Void> PARSER = new ObjectParser<>("sql/query", SqlRequest::new);
static {
PARSER.declareString(SqlRequest::query, new ParseField("query"));
PARSER.declareString((request, zoneId) -> request.timeZone(DateTimeZone.forID(zoneId)), new ParseField("time_zone"));
PARSER.declareInt(SqlRequest::fetchSize, new ParseField("fetch_size"));
PARSER.declareString((request, nextPage) -> request.cursor(Cursor.decodeFromString(nextPage)), CURSOR);
}
public static final DateTimeZone DEFAULT_TIME_ZONE = DateTimeZone.UTC;
// initialized on the first request
private String query;
public static final int DEFAULT_FETCH_SIZE = 1000;
private String query = "";
private DateTimeZone timeZone = DEFAULT_TIME_ZONE;
// initialized after the plan has been translated
private String sessionId;
private Cursor cursor = Cursor.EMPTY;
private int fetchSize = DEFAULT_FETCH_SIZE;
public SqlRequest() {}
public SqlRequest(String query, DateTimeZone timeZone, String sessionId) {
public SqlRequest(String query, DateTimeZone timeZone, Cursor nextPageInfo) {
this.query = query;
this.timeZone = timeZone;
this.sessionId = sessionId;
this.cursor = nextPageInfo;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (!Strings.hasText(query)) {
validationException = addValidationError("sql query is missing", validationException);
}
if (timeZone == null) {
validationException = addValidationError("timezone is missing", validationException);
if ((false == Strings.hasText(query)) && cursor == Cursor.EMPTY) {
validationException = addValidationError("one of [query] or [cursor] is required", validationException);
}
return validationException;
}
@ -51,35 +60,71 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
return query;
}
public String sessionId() {
return sessionId;
public SqlRequest query(String query) {
if (query == null) {
throw new IllegalArgumentException("query may not be null.");
}
this.query = query;
return this;
}
public DateTimeZone timeZone() {
return timeZone;
}
public SqlRequest query(String query) {
this.query = query;
return this;
}
public SqlRequest sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
public SqlRequest timeZone(DateTimeZone timeZone) {
if (query == null) {
throw new IllegalArgumentException("time zone may not be null.");
}
this.timeZone = timeZone;
return this;
}
/**
* The key that must be sent back to SQL to access the next page of
* results.
*/
public Cursor cursor() {
return cursor;
}
/**
* The key that must be sent back to SQL to access the next page of
* results.
*/
public SqlRequest cursor(Cursor cursor) {
if (cursor == null) {
throw new IllegalArgumentException("cursor may not be null.");
}
this.cursor = cursor;
return this;
}
/**
* Hint about how many results to fetch at once.
*/
public int fetchSize() {
return fetchSize;
}
/**
* Hint about how many results to fetch at once.
*/
public SqlRequest fetchSize(int fetchSize) {
if (fetchSize <= 0) {
throw new IllegalArgumentException("fetch_size must be more than 0");
}
this.fetchSize = fetchSize;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
query = in.readString();
timeZone = DateTimeZone.forID(in.readString());
sessionId = in.readOptionalString();
cursor = in.readNamedWriteable(Cursor.class);
fetchSize = in.readVInt();
}
@Override
@ -87,12 +132,13 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
super.writeTo(out);
out.writeString(query);
out.writeString(timeZone.getID());
out.writeOptionalString(sessionId);
out.writeNamedWriteable(cursor);
out.writeVInt(fetchSize);
}
@Override
public int hashCode() {
return Objects.hash(query, sessionId);
return Objects.hash(query, timeZone, cursor);
}
@Override
@ -107,11 +153,13 @@ public class SqlRequest extends ActionRequest implements CompositeIndicesRequest
SqlRequest other = (SqlRequest) obj;
return Objects.equals(query, other.query)
&& Objects.equals(sessionId, other.sessionId);
&& Objects.equals(timeZone, other.timeZone)
&& Objects.equals(cursor, other.cursor)
&& fetchSize == other.fetchSize;
}
@Override
public String getDescription() {
return "SQL [" + query + "/" + sessionId + "]";
return "SQL [" + query + "]";
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.joda.time.DateTimeZone;
import static org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest.DEFAULT_TIME_ZONE;
@ -14,11 +15,11 @@ import static org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest.DEFAULT_T
public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlResponse, SqlRequestBuilder> {
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action) {
this(client, action, null, DEFAULT_TIME_ZONE, null);
this(client, action, "", DEFAULT_TIME_ZONE, Cursor.EMPTY);
}
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, String sessionId) {
super(client, action, new SqlRequest(query, timeZone, sessionId));
public SqlRequestBuilder(ElasticsearchClient client, SqlAction action, String query, DateTimeZone timeZone, Cursor nextPageInfo) {
super(client, action, new SqlRequest(query, timeZone, nextPageInfo));
}
public SqlRequestBuilder query(String query) {
@ -26,8 +27,8 @@ public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlRespo
return this;
}
public SqlRequestBuilder sessionId(String sessionId) {
request.sessionId(sessionId);
public SqlRequestBuilder nextPageKey(Cursor nextPageInfo) {
request.cursor(nextPageInfo);
return this;
}

View File

@ -5,65 +5,142 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.sql.session.Cursor;
public class SqlResponse extends ActionResponse {
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
private String sessionId;
import static java.util.Collections.unmodifiableList;
public class SqlResponse extends ActionResponse implements ToXContentObject {
private Cursor cursor;
private long size;
// NOCOMMIT: we probably need to add more info about columns, but that's all we use for now
// NOCOMMIT: order of elements is important, so we might want to replace this with lists and
// reflect this in generated JSON as well
private Map<String, String> columns;
private List<Map<String, Object>> rows;
private int columnCount;
private List<ColumnInfo> columns;
private List<List<Object>> rows;
public SqlResponse() {
}
public SqlResponse(String sessionId, long size, Map<String, String> columns, List<Map<String, Object>> rows) {
this.sessionId = sessionId;
public SqlResponse(Cursor cursor, long size, int columnCount, @Nullable List<ColumnInfo> columns, List<List<Object>> rows) {
this.cursor = cursor;
this.size = size;
this.columnCount = columnCount;
this.columns = columns;
this.rows = rows;
}
/**
* The key that must be sent back to SQL to access the next page of
* results. If {@link BytesReference#length()} is {@code 0} then
* there is no next page.
*/
public Cursor nextPageInfo() {
return cursor;
}
public long size() {
return size;
}
public Map<String, String> columns() {
public List<ColumnInfo> columns() {
return columns;
}
public List<Map<String, Object>> rows() {
public List<List<Object>> rows() {
return rows;
}
@Override
public void readFrom(StreamInput in) throws IOException {
sessionId = in.readOptionalString();
cursor = in.readNamedWriteable(Cursor.class);
size = in.readVLong();
columns = in.readMap(StreamInput::readString, StreamInput::readString);
rows = in.readList(StreamInput::readMap);
columnCount = in.readVInt();
if (in.readBoolean()) {
List<ColumnInfo> columns = new ArrayList<>(columnCount);
for (int c = 0; c < columnCount; c++) {
columns.add(new ColumnInfo(in));
}
this.columns = unmodifiableList(columns);
} else {
this.columns = null;
}
int rowCount = in.readVInt();
List<List<Object>> rows = new ArrayList<>(rowCount);
for (int r = 0; r < rowCount; r++) {
List<Object> row = new ArrayList<>(columnCount);
for (int c = 0; c < columnCount; c++) {
row.add(in.readGenericValue());
}
rows.add(unmodifiableList(row));
}
this.rows = unmodifiableList(rows);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(sessionId);
out.writeNamedWriteable(cursor);
out.writeVLong(size);
out.writeMap(columns, StreamOutput::writeString, StreamOutput::writeString);
out.writeVInt(rows.size());
for (Map<String, Object> row : rows) {
out.writeMap(row);
out.writeVInt(columnCount);
if (columns == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
assert columns.size() == columnCount;
for (ColumnInfo column : columns) {
column.writeTo(out);
}
}
out.writeVInt(rows.size());
for (List<Object> row : rows) {
assert row.size() == columnCount;
for (Object value : row) {
out.writeGenericValue(value);
}
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("size", size());
if (columns != null) {
builder.startArray("columns"); {
for (ColumnInfo column : columns) {
column.toXContent(builder, params);
}
}
builder.endArray();
}
builder.startArray("rows");
for (List<Object> row : rows()) {
builder.startArray();
for (Object value : row) {
builder.value(value);
}
builder.endArray();
}
builder.endArray();
if (cursor != Cursor.EMPTY) {
builder.field(SqlRequest.CURSOR.getPreferredName(), Cursor.encodeToString(cursor));
}
}
return builder.endObject();
}
@Override
@ -72,13 +149,79 @@ public class SqlResponse extends ActionResponse {
if (o == null || getClass() != o.getClass()) return false;
SqlResponse that = (SqlResponse) o;
return size == that.size &&
Objects.equals(sessionId, that.sessionId) &&
Objects.equals(cursor, that.cursor) &&
Objects.equals(columns, that.columns) &&
Objects.equals(rows, that.rows);
}
@Override
public int hashCode() {
return Objects.hash(sessionId, size, columns, rows);
return Objects.hash(cursor, size, columns, rows);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static final class ColumnInfo implements Writeable, ToXContentObject {
// NOCOMMIT: we probably need to add more info about columns, but that's all we use for now
private final String name;
private final String type;
public ColumnInfo(String name, String type) {
this.name = name;
this.type = type;
}
ColumnInfo(StreamInput in) throws IOException {
name = in.readString();
type = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("name", name);
builder.field("type", type);
return builder.endObject();
}
/**
* Name of the column.
*/
public String name() {
return name;
}
public String type() {
return type;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
ColumnInfo other = (ColumnInfo) obj;
return name.equals(other.name)
&& type.equals(other.type);
}
@Override
public int hashCode() {
return Objects.hash(name, type);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -9,38 +9,24 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
import static java.util.Collections.unmodifiableList;
public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlResponse> {
//TODO: externalize timeout
private final Cache<String, RowSetCursor> SESSIONS = CacheBuilder.<String, RowSetCursor>builder()
.setMaximumWeight(1024)
.setExpireAfterAccess(TimeValue.timeValueMinutes(10))
.setExpireAfterWrite(TimeValue.timeValueMinutes(10))
.build();
private final Supplier<String> ephemeralId;
private final PlanExecutor planExecutor;
private final SqlLicenseChecker sqlLicenseChecker;
@ -54,64 +40,44 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
this.planExecutor = planExecutor;
this.sqlLicenseChecker = sqlLicenseChecker;
ephemeralId = () -> transportService.getLocalNode().getEphemeralId();
}
@Override
protected void doExecute(SqlRequest request, ActionListener<SqlResponse> listener) {
sqlLicenseChecker.checkIfSqlAllowed();
String sessionId = request.sessionId();
String query = request.query();
try {
if (sessionId == null) {
if (!Strings.hasText(query)) {
listener.onFailure(new SqlIllegalArgumentException("No query is given and request not part of a session"));
return;
}
// NOCOMMIT this should be linked up
// SqlSettings sqlCfg = new SqlSettings(Settings.builder()
// .put(SqlSettings.PAGE_SIZE, req.fetchSize)
// .put(SqlSettings.TIMEZONE_ID, request.timeZone().getID()).build());
planExecutor.sql(query, chain(listener, c -> {
String id = generateId();
SESSIONS.put(id, c);
return createResponse(id, c);
}));
} else {
RowSetCursor cursor = SESSIONS.get(sessionId);
if (cursor == null) {
listener.onFailure(new SqlIllegalArgumentException("SQL session cannot be found"));
} else {
cursor.nextSet(chain(listener, c -> createResponse(sessionId, cursor)));
}
}
} catch (Exception ex) {
listener.onFailure(ex);
if (request.cursor() == Cursor.EMPTY) {
SqlSettings sqlSettings = new SqlSettings(Settings.builder()
.put(SqlSettings.PAGE_SIZE, request.fetchSize())
.put(SqlSettings.TIMEZONE_ID, request.timeZone().getID()).build());
planExecutor.sql(sqlSettings, request.query(),
ActionListener.wrap(cursor -> listener.onResponse(createResponse(true, cursor)), listener::onFailure));
} else {
planExecutor.nextPage(request.cursor(),
ActionListener.wrap(cursor -> listener.onResponse(createResponse(false, cursor)), listener::onFailure));
}
}
private String generateId() {
return ephemeralId.get() + "-" + UUIDs.base64UUID();
}
static SqlResponse createResponse(boolean includeColumnMetadata, RowSetCursor cursor) {
List<ColumnInfo> columns = null;
if (includeColumnMetadata) {
columns = new ArrayList<>(cursor.schema().types().size());
for (Schema.Entry entry : cursor.schema()) {
columns.add(new ColumnInfo(entry.name(), entry.type().esName()));
}
columns = unmodifiableList(columns);
}
static SqlResponse createResponse(String sessionId, RowSetCursor cursor) {
Map<String, String> columns = new LinkedHashMap<>(cursor.schema().types().size());
cursor.schema().forEach(entry -> {
columns.put(entry.name(), entry.type().esName());
List<List<Object>> rows = new ArrayList<>();
cursor.forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.rowSize());
rowView.forEachColumn(row::add);
rows.add(unmodifiableList(row));
});
List<Map<String, Object>> rows = new ArrayList<>();
cursor.forEachRow(objects -> {
Map<String, Object> row = new LinkedHashMap<>(objects.rowSize());
objects.forEachColumn((o, entry) -> row.put(entry.name(), o));
rows.add(row);
});
return new SqlResponse(
sessionId,
cursor.nextPageCursor(),
cursor.size(),
cursor.rowSize(),
columns,
rows);
}

View File

@ -1,59 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin.sql.rest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.util.ThrowableBiConsumer;
import org.elasticsearch.xpack.sql.util.ThrowableConsumer;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.OK;
class CursorRestResponseListener extends RestBuilderListener<SqlResponse> {
CursorRestResponseListener(RestChannel channel) {
super(channel);
}
@Override
public RestResponse buildResponse(SqlResponse response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(OK, createResponse(response, builder));
}
private static XContentBuilder createResponse(SqlResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
// header
builder.field("size", response.size());
// NOCOMMIT: that should be a list since order is important
builder.startObject("columns");
ThrowableBiConsumer<String, String> buildSchema = (f, t) -> builder.startObject(f).field("type", t).endObject();
response.columns().forEach(buildSchema);
builder.endObject();
// payload
builder.startArray("rows");
ThrowableBiConsumer<String, Object> eachColumn = builder::field;
// NOCOMMIT: that should be a list since order is important
ThrowableConsumer<Map<String, Object>> eachRow = r -> { builder.startObject(); r.forEach(eachColumn); builder.endObject(); };
response.rows().forEach(eachRow);
builder.endArray();
builder.endObject();
builder.close();
return builder;
}
}

View File

@ -5,29 +5,23 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.rest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.joda.time.DateTimeZone;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
public class RestSqlAction extends BaseRestHandler {
public RestSqlAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(GET, "/_sql", this);
@ -36,61 +30,18 @@ public class RestSqlAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
Payload p;
try {
p = Payload.from(request);
} catch (IOException ex) {
return channel -> error(channel, ex);
SqlRequest sqlRequest;
try (XContentParser parser = request.contentOrSourceParamParser()) {
sqlRequest = SqlRequest.PARSER.apply(parser, null);
}
return channel -> client.executeLocally(SqlAction.INSTANCE, new SqlRequest(p.query, p.timeZone, null),
new CursorRestResponseListener(channel));
}
private void error(RestChannel channel, Exception ex) {
logger.debug("failed to parse sql request", ex);
BytesRestResponse response = null;
try {
response = new BytesRestResponse(channel, ex);
} catch (IOException e) {
response = new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, ExceptionsHelper.stackTrace(e));
}
channel.sendResponse(response);
return channel -> client.executeLocally(
SqlAction.INSTANCE, sqlRequest, new RestToXContentListener<SqlResponse>(channel));
}
@Override
public String getName() {
return "sql_action";
}
static class Payload {
static final ObjectParser<Payload, Void> PARSER = new ObjectParser<>("sql/query");
static {
PARSER.declareString(Payload::setQuery, new ParseField("query"));
PARSER.declareString(Payload::setTimeZone, new ParseField("time_zone"));
}
String query;
DateTimeZone timeZone = SqlRequest.DEFAULT_TIME_ZONE;
static Payload from(RestRequest request) throws IOException {
Payload payload = new Payload();
try (XContentParser parser = request.contentParser()) {
Payload.PARSER.parse(parser, payload, null);
}
return payload;
}
public void setQuery(String query) {
this.query = query;
}
public void setTimeZone(String timeZone) {
this.timeZone = DateTimeZone.forID(timeZone);
}
}
}

View File

@ -24,4 +24,9 @@ public class ProcessingRef implements Reference {
public Reference ref() {
return ref;
}
@Override
public String toString() {
return processor + "(" + ref + ")";
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.xpack.sql.execution.search.HitExtractor;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
/**
* Information required to access the next page of response.
*/
public interface Cursor extends NamedWriteable {
Cursor EMPTY = EmptyCursor.INSTANCE;
/**
* Request the next page of data.
*/
void nextPage(Client client, ActionListener<RowSetCursor> listener);
/**
* Write the {@linkplain Cursor} to a String for serialization over xcontent.
*/
void writeTo(java.io.Writer writer) throws IOException;
/**
* The {@link NamedWriteable}s required to deserialize {@link Cursor}s.
*/
static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(HitExtractor.getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, EmptyCursor.NAME, in -> EMPTY));
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new));
return entries;
}
/**
* Write a {@linkplain Cursor} to a string for serialization across xcontent.
*/
static String encodeToString(Cursor info) {
StringWriter writer = new StringWriter();
try {
writer.write(info.getWriteableName());
info.writeTo(writer);
} catch (IOException e) {
throw new RuntimeException("unexpected failure converting next page info to a string", e);
}
return writer.toString();
}
/**
* Read a {@linkplain Cursor} from a string.
*/
static Cursor decodeFromString(String info) {
// TODO version compatibility
/* We need to encode minimum version across the cluster and use that
* to handle changes to this protocol across versions. */
String name = info.substring(0, 1);
try (java.io.Reader reader = new FastStringReader(info)) {
reader.skip(1);
switch (name) {
case EmptyCursor.NAME:
throw new RuntimeException("empty cursor shouldn't be encoded to a string");
case ScrollCursor.NAME:
return new ScrollCursor(reader);
default:
throw new RuntimeException("unknown cursor type [" + name + "]");
}
} catch (IOException e) {
throw new RuntimeException("unexpected failure deconding cursor", e);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
class EmptyCursor implements Cursor {
static final String NAME = "0";
static final EmptyCursor INSTANCE = new EmptyCursor();
private EmptyCursor() {
// Only one instance allowed
}
@Override
public void writeTo(StreamOutput out) throws IOException {
// Nothing to write
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(java.io.Writer writer) throws IOException {
throw new IOException("no next page should not be converted to or from a string");
}
@Override
public void nextPage(Client client, ActionListener<RowSetCursor> listener) {
throw new IllegalArgumentException("there is no next page");
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
@Override
public int hashCode() {
return 27;
}
@Override
public String toString() {
return "no next page";
}
}

View File

@ -37,4 +37,9 @@ class EmptyRowSetCursor extends AbstractRowSetCursor {
public int size() {
return 0;
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
}

View File

@ -5,10 +5,10 @@
*/
package org.elasticsearch.xpack.sql.session;
import java.util.List;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
class ListRowSetCursor extends AbstractRowSetCursor {
private final List<List<?>> list;
@ -47,4 +47,9 @@ class ListRowSetCursor extends AbstractRowSetCursor {
public int size() {
return list.size();
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
}

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.util.function.Consumer;
/**
@ -21,10 +24,15 @@ public interface RowSet extends RowView {
boolean advanceRow();
int size();
int size(); // NOCOMMIT why do we have this? It looks like the count of the rows in this chunk.
void reset();
/**
* Return the key used by {@link PlanExecutor#nextPage(Cursor, ActionListener)} to fetch the next page.
*/
Cursor nextPageCursor();
default void forEachRow(Consumer<? super RowView> action) {
for (boolean hasRows = hasCurrentRow(); hasRows; hasRows = advanceRow()) {
action.accept(this);

View File

@ -5,12 +5,12 @@
*/
package org.elasticsearch.xpack.sql.session;
import java.util.Objects;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.SqlException;
import java.util.Objects;
import java.util.function.Consumer;
public interface RowSetCursor extends RowSet {
boolean hasNextSet();

View File

@ -37,14 +37,10 @@ public interface RowView extends Iterable<Object> {
}
default void forEachColumn(Consumer<? super Object> action) {
forEachColumn((c, e) -> action.accept(c));
}
default void forEachColumn(BiConsumer<? super Object, Schema.Entry> action) {
Objects.requireNonNull(action);
int rowSize = rowSize();
for (int i = 0; i < rowSize; i++) {
action.accept(column(i), schema().get(i));
action.accept(column(i));
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
class SingletonRowSet extends AbstractRowSetCursor {
class SingletonRowSet extends AbstractRowSetCursor { // NOCOMMIT is it worth keeping this when we have ListRowSet?
private final Object[] values;
@ -40,4 +40,9 @@ class SingletonRowSet extends AbstractRowSetCursor {
public int size() {
return 1;
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
}

View File

@ -22,6 +22,7 @@ public class SqlSettings {
private final Settings cfg;
public SqlSettings(Settings cfg) {
// NOCOMMIT investigate taking the arguments we need instead of Settings
this.cfg = cfg;
}

View File

@ -0,0 +1,327 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.type;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.util.function.DoubleFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
/**
* Conversions from one data type to another.
*/
public abstract class DataTypeConversion {
private static final DateTimeFormatter UTC_DATE_FORMATTER = ISODateTimeFormat.dateTimeNoMillis().withZoneUTC();
public static boolean nullable(DataType from, DataType to) {
return from instanceof NullType;
}
public static boolean canConvert(DataType from, DataType to) { // TODO it'd be cleaner and more right to fetch the conversion
// only primitives are supported so far
if (from.isComplex() || to.isComplex()) {
return false;
}
if (from.getClass() == to.getClass()) {
return true;
}
if (from instanceof NullType) {
return true;
}
// anything can be converted to String
if (to instanceof StringType) {
return true;
}
// also anything can be converted into a bool
if (to instanceof BooleanType) {
return true;
}
// numeric conversion
if ((from instanceof StringType || from instanceof BooleanType || from instanceof DateType || from.isNumeric()) && to.isNumeric()) {
return true;
}
// date conversion
if ((from instanceof DateType || from instanceof StringType || from.isNumeric()) && to instanceof DateType) {
return true;
}
return false;
}
/**
* Get the conversion from one type to another.
*/
public static Conversion conversionFor(DataType from, DataType to) {
if (to instanceof StringType) {
return conversionToString(from);
}
if (to instanceof LongType) {
return conversionToLong(from);
}
if (to instanceof IntegerType) {
return conversionToInt(from);
}
if (to instanceof ShortType) {
return conversionToShort(from);
}
if (to instanceof ByteType) {
return conversionToByte(from);
}
if (to instanceof FloatType) {
return conversionToFloat(from);
}
if (to instanceof DoubleType) {
return conversionToDouble(from);
}
if (to instanceof DateType) {
return conversionToDate(from);
}
if (to instanceof BooleanType) {
return conversionToBoolean(from);
}
throw new SqlIllegalArgumentException("cannot convert from [" + from + "] to [" + to + "]");
}
private static Conversion conversionToString(DataType from) {
if (from instanceof DateType) {
return Conversion.DATE_TO_STRING;
}
return Conversion.OTHER_TO_STRING;
}
private static Conversion conversionToLong(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_LONG;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_LONG;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_INT; // We emit an int here which is ok because of Java's casting rules
}
if (from instanceof StringType) {
return Conversion.STRING_TO_LONG;
}
throw new SqlIllegalArgumentException("cannot convert from [" + from + "] to [Long]");
}
private static Conversion conversionToInt(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_INT;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_INT;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_INT;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_INT;
}
throw new SqlIllegalArgumentException("cannot convert from [" + from + "] to [Integer]");
}
private static Conversion conversionToShort(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_SHORT;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_SHORT;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_SHORT;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_SHORT;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Short]");
}
private static Conversion conversionToByte(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_BYTE;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_BYTE;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_BYTE;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_BYTE;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Byte]");
}
private static Conversion conversionToFloat(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_FLOAT;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_FLOAT;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_FLOAT;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_FLOAT;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Float]");
}
private static Conversion conversionToDouble(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_DOUBLE;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_DOUBLE;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_DOUBLE;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_DOUBLE;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Double]");
}
private static Conversion conversionToDate(DataType from) {
if (from.isRational()) {
return Conversion.RATIONAL_TO_LONG;
}
if (from.isInteger()) {
return Conversion.INTEGER_TO_LONG;
}
if (from instanceof BooleanType) {
return Conversion.BOOL_TO_INT; // We emit an int here which is ok because of Java's casting rules
}
if (from instanceof StringType) {
return Conversion.STRING_TO_DATE;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Date]");
}
private static Conversion conversionToBoolean(DataType from) {
if (from.isNumeric()) {
return Conversion.NUMERIC_TO_BOOLEAN;
}
if (from instanceof StringType) {
return Conversion.STRING_TO_BOOLEAN;
}
throw new SqlIllegalArgumentException("cannot convert [" + from + "] to [Boolean]");
}
private static byte safeToByte(long x) {
if (x > Byte.MAX_VALUE || x < Byte.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of byte range", Long.toString(x));
}
return (byte) x;
}
private static short safeToShort(long x) {
if (x > Short.MAX_VALUE || x < Short.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of short range", Long.toString(x));
}
return (short) x;
}
private static int safeToInt(long x) {
if (x > Integer.MAX_VALUE || x < Integer.MIN_VALUE) {
// NOCOMMIT should these instead be regular IllegalArgumentExceptions so we throw a 400 error? Or something else?
throw new SqlIllegalArgumentException("numeric %d out of int range", Long.toString(x));
}
return (int) x;
}
private static long safeToLong(double x) {
if (x > Long.MAX_VALUE || x < Long.MIN_VALUE) {
throw new SqlIllegalArgumentException("[" + x + "] out of [Long] range");
}
return Math.round(x);
}
/**
* Reference to a data type conversion that can be serialized. Note that the position in the enum
* is important because it is used for serialization.
*/
public enum Conversion {
DATE_TO_STRING(fromLong(UTC_DATE_FORMATTER::print)),
OTHER_TO_STRING(String::valueOf),
RATIONAL_TO_LONG(fromDouble(DataTypeConversion::safeToLong)),
INTEGER_TO_LONG(fromLong(value -> value)),
STRING_TO_LONG(fromString(Long::valueOf, "Long")),
RATIONAL_TO_INT(fromDouble(value -> safeToInt(safeToLong(value)))),
INTEGER_TO_INT(fromLong(DataTypeConversion::safeToInt)),
BOOL_TO_INT(fromBool(value -> value ? 1 : 0)),
STRING_TO_INT(fromString(Integer::valueOf, "Int")),
RATIONAL_TO_SHORT(fromDouble(value -> safeToShort(safeToLong(value)))),
INTEGER_TO_SHORT(fromLong(DataTypeConversion::safeToShort)),
BOOL_TO_SHORT(fromBool(value -> value ? (short) 1 : (short) 0)),
STRING_TO_SHORT(fromString(Short::valueOf, "Short")),
RATIONAL_TO_BYTE(fromDouble(value -> safeToByte(safeToLong(value)))),
INTEGER_TO_BYTE(fromLong(DataTypeConversion::safeToByte)),
BOOL_TO_BYTE(fromBool(value -> value ? (byte) 1 : (byte) 0)),
STRING_TO_BYTE(fromString(Byte::valueOf, "Byte")),
// TODO floating point conversions are lossy but conversions to integer conversions are not. Are we ok with that?
RATIONAL_TO_FLOAT(fromDouble(value -> (float) value)),
INTEGER_TO_FLOAT(fromLong(value -> (float) value)),
BOOL_TO_FLOAT(fromBool(value -> value ? 1f : 0f)),
STRING_TO_FLOAT(fromString(Float::valueOf, "Float")),
RATIONAL_TO_DOUBLE(fromDouble(value -> value)),
INTEGER_TO_DOUBLE(fromLong(Double::valueOf)),
BOOL_TO_DOUBLE(fromBool(value -> value ? 1d: 0d)),
STRING_TO_DOUBLE(fromString(Double::valueOf, "Double")),
STRING_TO_DATE(fromString(UTC_DATE_FORMATTER::parseMillis, "Date")),
NUMERIC_TO_BOOLEAN(fromLong(value -> value != 0)),
STRING_TO_BOOLEAN(fromString(Booleans::isBoolean, "Boolean")), // NOCOMMIT probably wrong
;
private final Function<Object, Object> converter;
Conversion(Function<Object, Object> converter) {
this.converter = converter;
}
private static Function<Object, Object> fromDouble(DoubleFunction<Object> converter) {
return (Object l) -> converter.apply(((Number) l).doubleValue());
}
private static Function<Object, Object> fromLong(LongFunction<Object> converter) {
return (Object l) -> converter.apply(((Number) l).longValue());
}
private static Function<Object, Object> fromString(Function<String, Object> converter, String to) {
return (Object value) -> {
try {
return converter.apply(value.toString());
} catch (NumberFormatException e) {
throw new SqlIllegalArgumentException("cannot cast [" + value + "] to [" + to + "]", e);
} catch (IllegalArgumentException e) {
throw new SqlIllegalArgumentException("cannot cast [" + value + "] to [" + to + "]: " + e.getMessage(), e);
}
};
}
private static Function<Object, Object> fromBool(Function<Boolean, Object> converter) {
return (Object l) -> converter.apply(((Boolean) l));
}
public Object convert(Object l) {
if (l == null) {
return null;
}
return converter.apply(l);
}
}
}

View File

@ -1,285 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.type;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
// utilities around DataTypes and SQL
public abstract class DataTypeConvertion {
private static DateTimeFormatter UTC_DATE_FORMATTER = ISODateTimeFormat.dateTimeNoMillis();//.withZoneUTC();
public static boolean canConvert(DataType from, DataType to) {
// only primitives are supported so far
if (from.isComplex() || to.isComplex()) {
return false;
}
if (from.getClass() == to.getClass()) {
return true;
}
if (from instanceof NullType) {
return true;
}
// anything can be converted to String
if (to instanceof StringType) {
return true;
}
// also anything can be converted into a bool
if (to instanceof BooleanType) {
return true;
}
// numeric conversion
if ((from instanceof StringType || from instanceof BooleanType || from instanceof DateType || from.isNumeric()) && to.isNumeric()) {
return true;
}
// date conversion
if ((from instanceof DateType || from instanceof StringType || from.isNumeric()) && to instanceof DateType) {
return true;
}
return false;
}
@SuppressWarnings("unchecked")
public static <T> T convert(Object value, DataType from, DataType to) {
if (value == null) {
return null;
}
Object result = null;
if (to instanceof StringType) {
result = toString(value, from);
}
else if (to instanceof LongType) {
result = toLong(value, from);
}
else if (to instanceof IntegerType) {
result = toInt(value, from);
}
else if (to instanceof ShortType) {
result = toShort(value, from);
}
else if (to instanceof ByteType) {
result = toByte(value, from);
}
else if (to instanceof FloatType) {
result = toFloat(value, from);
}
else if (to instanceof DoubleType) {
result = toDouble(value, from);
}
else if (to instanceof DateType) {
result = toDate(value, from);
}
else if (to instanceof BooleanType) {
result = toBoolean(value, from);
}
return (T) result;
}
private static Boolean toBoolean(Object value, DataType from) {
if (value instanceof Number) {
return ((Number) value).longValue() != 0;
}
if (value instanceof String) {
return Booleans.isBoolean(value.toString());
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Boolean", value);
}
private static String toString(Object value, DataType from) {
if (from instanceof DateType) {
// TODO: maybe detect the cast and return the source instead
return UTC_DATE_FORMATTER.print((Long) value);
}
return String.valueOf(value);
}
private static Byte toByte(Object value, DataType from) {
if (from.isRational()) {
return safeToByte(safeToLong(((Number) value).doubleValue()));
}
if (from.isInteger()) {
return safeToByte(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Byte.valueOf(((Boolean) value).booleanValue() ? (byte) 1 : (byte) 0);
}
if (from instanceof StringType) {
try {
return Byte.parseByte(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Byte", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Byte", value);
}
private static Short toShort(Object value, DataType from) {
if (from.isRational()) {
return safeToShort(safeToLong(((Number) value).doubleValue()));
}
if (from.isInteger()) {
return safeToShort(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Short.valueOf(((Boolean) value).booleanValue() ? (short) 1 : (short) 0);
}
if (from instanceof StringType) {
try {
return Short.parseShort(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Short", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Short", value);
}
private static Integer toInt(Object value, DataType from) {
if (from.isRational()) {
return safeToInt(safeToLong(((Number) value).doubleValue()));
}
if (from.isInteger()) {
return safeToInt(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Integer.valueOf(((Boolean) value).booleanValue() ? 1 : 0);
}
if (from instanceof StringType) {
try {
return Integer.parseInt(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Integer", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Integer", value);
}
private static Long toLong(Object value, DataType from) {
if (from.isRational()) {
return safeToLong(((Number) value).doubleValue());
}
if (from.isInteger()) {
return Long.valueOf(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Long.valueOf(((Boolean) value).booleanValue() ? 1 : 0);
}
if (from instanceof StringType) {
try {
return Long.parseLong(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Long", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Long", value);
}
private static Float toFloat(Object value, DataType from) {
if (from.isRational()) {
return new Float(((Number) value).doubleValue());
}
if (from.isInteger()) {
return Float.valueOf(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Float.valueOf(((Boolean) value).booleanValue() ? 1 : 0);
}
if (from instanceof StringType) {
try {
return Float.parseFloat(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Float", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Float", value);
}
private static Double toDouble(Object value, DataType from) {
if (from.isRational()) {
return new Double(((Number) value).doubleValue());
}
if (from.isInteger()) {
return Double.valueOf(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Double.valueOf(((Boolean) value).booleanValue() ? 1 : 0);
}
if (from instanceof StringType) {
try {
return Double.parseDouble(String.valueOf(value));
} catch (NumberFormatException nfe) {
throw new SqlIllegalArgumentException("Cannot cast %s to Double", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Double", value);
}
// dates are returned in long format (for UTC)
private static Long toDate(Object value, DataType from) {
if (from.isRational()) {
return safeToLong(((Number) value).doubleValue());
}
if (from.isInteger()) {
return Long.valueOf(((Number) value).longValue());
}
if (from instanceof BooleanType) {
return Long.valueOf(((Boolean) value).booleanValue() ? 1 : 0);
}
if (from instanceof StringType) {
try {
return UTC_DATE_FORMATTER.parseMillis(String.valueOf(value));
} catch (IllegalArgumentException iae) {
throw new SqlIllegalArgumentException("Cannot cast %s to Date", value);
}
}
throw new SqlIllegalArgumentException("Does not know how to convert object %s to Double", value);
}
private static byte safeToByte(long x) {
if (x > Byte.MAX_VALUE || x < Byte.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of byte range", Long.toString(x));
}
return (byte) x;
}
private static short safeToShort(long x) {
if (x > Short.MAX_VALUE || x < Short.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of short range", Long.toString(x));
}
return (short) x;
}
private static int safeToInt(long x) {
if (x > Integer.MAX_VALUE || x < Integer.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of int range", Long.toString(x));
}
return (int) x;
}
private static long safeToLong(double x) {
if (x > Long.MAX_VALUE || x < Long.MIN_VALUE) {
throw new SqlIllegalArgumentException("Numeric %d out of long range", Double.toString(x));
}
return Math.round(x);
}
public static boolean nullable(DataType from, DataType to) {
if (from instanceof NullType) {
return true;
}
return false;
}
}

View File

@ -25,7 +25,7 @@ public class Schema implements Iterable<Entry> {
DataType type();
}
private static class DefaultEntry implements Entry {
static class DefaultEntry implements Entry {
private final String name;
private final DataType type;

View File

@ -1,23 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.util;
import java.util.function.BiConsumer;
public interface ThrowableBiConsumer<T, U> extends BiConsumer<T, U> {
// NOCOMMIT replace with CheckedBiConsumer
@Override
default void accept(T t, U u) {
try {
acceptThrows(t, u);
} catch (Exception ex) {
throw new WrappingException(ex);
}
}
void acceptThrows(T t, U u) throws Exception;
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.util.function.Supplier;
public class ConstantExtractorTests extends AbstractWireSerializingTestCase<ConstantExtractor> {
static ConstantExtractor randomConstantExtractor() {
return new ConstantExtractor(randomValidConstant());
}
private static Object randomValidConstant() {
@SuppressWarnings("unchecked")
Supplier<Object> valueSupplier = randomFrom(
() -> randomInt(),
() -> randomDouble(),
() -> randomAlphaOfLengthBetween(1, 140));
return valueSupplier.get();
}
@Override
protected ConstantExtractor createTestInstance() {
return randomConstantExtractor();
}
@Override
protected Reader<ConstantExtractor> instanceReader() {
return ConstantExtractor::new;
}
@Override
protected ConstantExtractor mutateInstance(ConstantExtractor instance) throws IOException {
return new ConstantExtractor(instance.get(null) + "mutated");
}
public void testGet() {
Object expected = randomValidConstant();
int times = between(1, 1000);
for (int i = 0; i < times; i++) {
assertSame(expected, new ConstantExtractor(expected).get(null));
}
}
public void testToString() {
assertEquals("^foo", new ConstantExtractor("foo").toString());
assertEquals("^42", new ConstantExtractor("42").toString());
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static java.util.Collections.singletonMap;
public class DocValueExtractorTests extends AbstractWireSerializingTestCase<DocValueExtractor> {
static DocValueExtractor randomDocValueExtractor() {
return new DocValueExtractor(randomAlphaOfLength(5));
}
@Override
protected DocValueExtractor createTestInstance() {
return randomDocValueExtractor();
}
@Override
protected Reader<DocValueExtractor> instanceReader() {
return DocValueExtractor::new;
}
@Override
protected DocValueExtractor mutateInstance(DocValueExtractor instance) throws IOException {
return new DocValueExtractor(instance.toString().substring(1) + "mutated");
}
public void testGet() {
String fieldName = randomAlphaOfLength(5);
DocValueExtractor extractor = new DocValueExtractor(fieldName);
int times = between(1, 1000);
for (int i = 0; i < times; i++) {
List<Object> documentFieldValues = new ArrayList<>();
documentFieldValues.add(new Object());
if (randomBoolean()) {
documentFieldValues.add(new Object());
}
SearchHit hit = new SearchHit(1);
DocumentField field = new DocumentField(fieldName, documentFieldValues);
hit.fields(singletonMap(fieldName, field));
assertEquals(documentFieldValues.get(0), extractor.get(hit));
}
}
public void testToString() {
assertEquals("%incoming_links", new DocValueExtractor("incoming_links").toString());
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
public class InnerHitExtractorTests extends AbstractWireSerializingTestCase<InnerHitExtractor> {
static InnerHitExtractor randomInnerHitExtractor() {
return new InnerHitExtractor(randomAlphaOfLength(5), randomAlphaOfLength(5), randomBoolean());
}
@Override
protected InnerHitExtractor createTestInstance() {
return randomInnerHitExtractor();
}
@Override
protected Reader<InnerHitExtractor> instanceReader() {
return InnerHitExtractor::new;
}
@Override
protected InnerHitExtractor mutateInstance(InnerHitExtractor instance) throws IOException {
return new InnerHitExtractor(instance.hitName() + "mustated", instance.fieldName(), true);
}
public void testGet() throws IOException {
// NOCOMMIT implement after we're sure of the InnerHitExtractor's implementation
}
public void testToString() {
assertEquals("field@hit", new InnerHitExtractor("hit", "field", true).toString());
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.expression.function.scalar.CastProcessorTests;
import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.ComposeProcessorTests;
import org.elasticsearch.xpack.sql.expression.function.scalar.DateTimeProcessorTests;
import org.elasticsearch.xpack.sql.expression.function.scalar.MathFunctionProcessor;
import org.elasticsearch.xpack.sql.expression.function.scalar.MathFunctionProcessorTests;
import org.elasticsearch.xpack.sql.expression.function.scalar.MatrixFieldProcessorTests;
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
public class ProcessingHitExtractorTests extends AbstractWireSerializingTestCase<ProcessingHitExtractor> {
public static ProcessingHitExtractor randomProcessingHitExtractor(int depth) {
return new ProcessingHitExtractor(ScrollCursorTests.randomHitExtractor(depth + 1), randomColumnProcessor(0));
}
public static ColumnProcessor randomColumnProcessor(int depth) {
List<Supplier<ColumnProcessor>> options = new ArrayList<>();
if (depth < 5) {
options.add(() -> ComposeProcessorTests.randomComposeProcessor(depth));
}
options.add(CastProcessorTests::randomCastProcessor);
options.add(DateTimeProcessorTests::randomDateTimeProcessor);
options.add(MathFunctionProcessorTests::randomMathFunctionProcessor);
options.add(MatrixFieldProcessorTests::randomMatrixFieldProcessor);
return randomFrom(options).get();
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(HitExtractor.getNamedWriteables());
}
@Override
protected ProcessingHitExtractor createTestInstance() {
return randomProcessingHitExtractor(0);
}
@Override
protected Reader<ProcessingHitExtractor> instanceReader() {
return ProcessingHitExtractor::new;
}
@Override
protected ProcessingHitExtractor mutateInstance(ProcessingHitExtractor instance) throws IOException {
@SuppressWarnings("unchecked")
Supplier<ProcessingHitExtractor> supplier = randomFrom(
() -> new ProcessingHitExtractor(
randomValueOtherThan(instance.delegate(), () -> ScrollCursorTests.randomHitExtractor(0)),
instance.processor()),
() -> new ProcessingHitExtractor(
instance.delegate(),
randomValueOtherThan(instance.processor(), () -> randomColumnProcessor(0))));
return supplier.get();
}
public void testGet() {
String fieldName = randomAlphaOfLength(5);
ProcessingHitExtractor extractor = new ProcessingHitExtractor(
new DocValueExtractor(fieldName), new MathFunctionProcessor(MathProcessor.LOG));
int times = between(1, 1000);
for (int i = 0; i < times; i++) {
double value = randomDouble();
double expected = Math.log(value);
SearchHit hit = new SearchHit(1);
DocumentField field = new DocumentField(fieldName, singletonList(value));
hit.fields(singletonMap(fieldName, field));
assertEquals(expected, extractor.get(hit));
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCursor> {
public static ScrollCursor randomScrollCursor() {
int extractorsSize = between(1, 20);
List<HitExtractor> extractors = new ArrayList<>(extractorsSize);
for (int i = 0; i < extractorsSize; i++) {
extractors.add(randomHitExtractor(0));
}
return new ScrollCursor(randomAlphaOfLength(5), extractors);
}
static HitExtractor randomHitExtractor(int depth) {
List<Supplier<HitExtractor>> options = new ArrayList<>();
if (depth < 5) {
options.add(() -> ProcessingHitExtractorTests.randomProcessingHitExtractor(depth));
}
options.add(ConstantExtractorTests::randomConstantExtractor);
options.add(DocValueExtractorTests::randomDocValueExtractor);
options.add(InnerHitExtractorTests::randomInnerHitExtractor);
options.add(SourceExtractorTests::randomSourceExtractor);
return randomFrom(options).get();
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(HitExtractor.getNamedWriteables());
}
@Override
protected ScrollCursor createTestInstance() {
return randomScrollCursor();
}
@Override
protected Reader<ScrollCursor> instanceReader() {
return ScrollCursor::new;
}
@Override
protected ScrollCursor copyInstance(ScrollCursor instance, Version version) throws IOException {
/* Randomly chose between internal protocol round trip and String based
* round trips used to toXContent. */
if (randomBoolean()) {
return super.copyInstance(instance, version);
}
// See comment in NextPageInfo#decodeFromString about versioning
assertEquals(Version.CURRENT, version);
try (StringWriter output = new StringWriter()) {
instance.writeTo(output);
try (java.io.Reader in = new FastStringReader(output.toString())) {
return new ScrollCursor(in);
}
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.util.function.Supplier;
public class SourceExtractorTests extends AbstractWireSerializingTestCase<SourceExtractor> {
static SourceExtractor randomSourceExtractor() {
return new SourceExtractor(randomAlphaOfLength(5));
}
@Override
protected SourceExtractor createTestInstance() {
return randomSourceExtractor();
}
@Override
protected Reader<SourceExtractor> instanceReader() {
return SourceExtractor::new;
}
@Override
protected SourceExtractor mutateInstance(SourceExtractor instance) throws IOException {
return new SourceExtractor(instance.toString().substring(1) + "mutated");
}
public void testGet() throws IOException {
String fieldName = randomAlphaOfLength(5);
SourceExtractor extractor = new SourceExtractor(fieldName);
int times = between(1, 1000);
for (int i = 0; i < times; i++) {
/* We use values that are parsed from json as "equal" to make the
* test simpler. */
@SuppressWarnings("unchecked")
Supplier<Object> valueSupplier = randomFrom(
() -> randomAlphaOfLength(5),
() -> randomInt(),
() -> randomDouble());
Object value = valueSupplier.get();
SearchHit hit = new SearchHit(1);
XContentBuilder source = JsonXContent.contentBuilder();
source.startObject(); {
source.field(fieldName, value);
if (randomBoolean()) {
source.field(fieldName + "_random_junk", value + "_random_junk");
}
}
source.endObject();
BytesReference sourceRef = source.bytes();
hit.sourceRef(sourceRef);
assertEquals(value, extractor.get(hit));
}
}
public void testToString() {
assertEquals("#name", new SourceExtractor("name").toString());
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.type.DataTypeConversion.Conversion;
import java.io.IOException;
public class CastProcessorTests extends AbstractWireSerializingTestCase<CastProcessor> {
public static CastProcessor randomCastProcessor() {
return new CastProcessor(randomFrom(Conversion.values()));
}
@Override
protected CastProcessor createTestInstance() {
return randomCastProcessor();
}
@Override
protected Reader<CastProcessor> instanceReader() {
return CastProcessor::new;
}
@Override
protected CastProcessor mutateInstance(CastProcessor instance) throws IOException {
return new CastProcessor(randomValueOtherThan(instance.converter(), () -> randomFrom(Conversion.values())));
}
public void testApply() {
{
CastProcessor proc = new CastProcessor(Conversion.STRING_TO_INT);
assertEquals(null, proc.apply(null));
assertEquals(1, proc.apply("1"));
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> proc.apply("1.2"));
assertEquals("cannot cast [1.2] to [Int]", e.getMessage());
}
{
CastProcessor proc = new CastProcessor(Conversion.BOOL_TO_INT);
assertEquals(null, proc.apply(null));
assertEquals(1, proc.apply(true));
assertEquals(0, proc.apply(false));
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.sql.execution.search.ProcessingHitExtractorTests.randomColumnProcessor;
public class ComposeProcessorTests extends AbstractWireSerializingTestCase<ComposeProcessor> {
public static ComposeProcessor randomComposeProcessor(int depth) {
return new ComposeProcessor(randomColumnProcessor(depth + 1), randomColumnProcessor(depth + 1));
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ColumnProcessor.getNamedWriteables());
}
@Override
protected ComposeProcessor createTestInstance() {
return randomComposeProcessor(0);
}
@Override
protected Reader<ComposeProcessor> instanceReader() {
return ComposeProcessor::new;
}
@Override
protected ComposeProcessor mutateInstance(ComposeProcessor instance) throws IOException {
@SuppressWarnings("unchecked")
Supplier<ComposeProcessor> supplier = randomFrom(
() -> new ComposeProcessor(
instance.first(), randomValueOtherThan(instance.second(), () -> randomColumnProcessor(0))),
() -> new ComposeProcessor(
randomValueOtherThan(instance.first(), () -> randomColumnProcessor(0)), instance.second()),
() -> new ComposeProcessor(instance.second(), instance.first()));
return supplier.get();
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeExtractor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
public class DateTimeProcessorTests extends AbstractWireSerializingTestCase<DateTimeProcessor> {
public static DateTimeProcessor randomDateTimeProcessor() {
return new DateTimeProcessor(randomFrom(DateTimeExtractor.values()));
}
@Override
protected DateTimeProcessor createTestInstance() {
return randomDateTimeProcessor();
}
@Override
protected Reader<DateTimeProcessor> instanceReader() {
return DateTimeProcessor::new;
}
@Override
protected DateTimeProcessor mutateInstance(DateTimeProcessor instance) throws IOException {
return new DateTimeProcessor(randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values())));
}
public void testApply() {
DateTimeProcessor proc = new DateTimeProcessor(DateTimeExtractor.YEAR);
assertEquals(1970, proc.apply(0L));
assertEquals(1970, proc.apply(new DateTime(0L, DateTimeZone.UTC)));
assertEquals(2017, proc.apply(new DateTime(2017, 01, 02, 10, 10, DateTimeZone.UTC)));
proc = new DateTimeProcessor(DateTimeExtractor.DAY_OF_MONTH);
assertEquals(1, proc.apply(0L));
assertEquals(1, proc.apply(new DateTime(0L, DateTimeZone.UTC)));
assertEquals(2, proc.apply(new DateTime(2017, 01, 02, 10, 10, DateTimeZone.UTC)));
assertEquals(31, proc.apply(new DateTime(2017, 01, 31, 10, 10, DateTimeZone.UTC)));
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
import java.io.IOException;
public class MathFunctionProcessorTests extends AbstractWireSerializingTestCase<MathFunctionProcessor> {
public static MathFunctionProcessor randomMathFunctionProcessor() {
return new MathFunctionProcessor(randomFrom(MathProcessor.values()));
}
@Override
protected MathFunctionProcessor createTestInstance() {
return randomMathFunctionProcessor();
}
@Override
protected Reader<MathFunctionProcessor> instanceReader() {
return MathFunctionProcessor::new;
}
@Override
protected MathFunctionProcessor mutateInstance(MathFunctionProcessor instance) throws IOException {
return new MathFunctionProcessor(randomValueOtherThan(instance.processor(), () -> randomFrom(MathProcessor.values())));
}
public void testApply() {
MathFunctionProcessor proc = new MathFunctionProcessor(MathProcessor.E);
assertEquals(Math.E, proc.apply(null));
assertEquals(Math.E, proc.apply("cat"));
assertEquals(Math.E, proc.apply(Math.PI));
proc = new MathFunctionProcessor(MathProcessor.SQRT);
assertEquals(2.0, (double) proc.apply(4), 0);
assertEquals(3.0, (double) proc.apply(9d), 0);
assertEquals(1.77, (double) proc.apply(3.14), 0.01);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.scalar;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import static java.util.Collections.singletonMap;
public class MatrixFieldProcessorTests extends AbstractWireSerializingTestCase<MatrixFieldProcessor> {
public static MatrixFieldProcessor randomMatrixFieldProcessor() {
return new MatrixFieldProcessor(randomAlphaOfLength(5));
}
@Override
protected MatrixFieldProcessor createTestInstance() {
return randomMatrixFieldProcessor();
}
@Override
protected Reader<MatrixFieldProcessor> instanceReader() {
return MatrixFieldProcessor::new;
}
@Override
protected MatrixFieldProcessor mutateInstance(MatrixFieldProcessor instance) throws IOException {
return new MatrixFieldProcessor(randomValueOtherThan(instance.key(), () -> randomAlphaOfLength(5)));
}
public void testApply() {
MatrixFieldProcessor proc = new MatrixFieldProcessor("test");
assertEquals(null, proc.apply(null));
assertEquals("cat", proc.apply("cat"));
assertEquals(null, proc.apply(singletonMap("foo", "cat")));
assertEquals("cat", proc.apply(singletonMap("test", "cat")));
}
}

View File

@ -5,18 +5,42 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction;
import org.elasticsearch.xpack.sql.plugin.SqlPlugin;
import static org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponseTests.randomCursor;
public class SqlRequestTests extends AbstractStreamableTestCase<SqlRequest> {
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(SqlPlugin.getNamedWriteables());
}
@Override
protected SqlRequest createTestInstance() {
return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), randomBoolean() ? randomAlphaOfLength(10) : null);
return new SqlRequest(randomAlphaOfLength(10), randomDateTimeZone(), randomCursor())
.fetchSize(between(1, Integer.MAX_VALUE));
}
@Override
protected SqlRequest createBlankInstance() {
return new SqlRequest();
}
}
@Override
@SuppressWarnings("unchecked")
protected MutateFunction<SqlRequest> getMutateFunction() {
return randomFrom(
request -> getCopyFunction().copy(request)
.query(randomValueOtherThan(request.query(), () -> randomAlphaOfLength(5))),
request -> getCopyFunction().copy(request)
.timeZone(randomValueOtherThan(request.timeZone(), ESTestCase::randomDateTimeZone)),
request -> getCopyFunction().copy(request)
.cursor(randomValueOtherThan(request.cursor(), SqlResponseTests::randomCursor)),
request -> getCopyFunction().copy(request)
.fetchSize(randomValueOtherThan(request.fetchSize(), () -> between(1, Integer.MAX_VALUE))));
}
}

View File

@ -5,47 +5,56 @@
*/
package org.elasticsearch.xpack.sql.plugin.sql.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests;
import org.elasticsearch.xpack.sql.plugin.SqlPlugin;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SqlResponseTests extends AbstractStreamableTestCase<SqlResponse> {
static Cursor randomCursor() {
return randomBoolean() ? Cursor.EMPTY : ScrollCursorTests.randomScrollCursor();
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(SqlPlugin.getNamedWriteables());
}
@Override
protected SqlResponse createTestInstance() {
Map<String, String> columns;
List<Map<String, Object>> rows;
int columnCount = between(1, 10);
List<ColumnInfo> columns = null;
if (randomBoolean()) {
columns = Collections.emptyMap();
} else {
int size = randomIntBetween(1, 10);
columns = new HashMap<>(size);
for (int i = 0; i < size; i++) {
columns.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
columns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
columns.add(new ColumnInfo(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
}
List<List<Object>> rows;
if (randomBoolean()) {
rows = Collections.emptyList();
} else {
int size = randomIntBetween(1, 10);
rows = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> row = new HashMap<>(size);
for (int j = 0; i < size; i++) {
row.put(randomAlphaOfLength(10), randomBoolean() ? randomAlphaOfLength(10) : randomInt());
int rowCount = between(1, 10);
rows = new ArrayList<>(rowCount);
for (int r = 0; r < rowCount; r++) {
List<Object> row = new ArrayList<>(rowCount);
for (int c = 0; c < columnCount; c++) {
row.add(randomBoolean() ? randomAlphaOfLength(10) : randomInt());
}
rows.add(row);
}
}
return new SqlResponse(randomAlphaOfLength(10), randomNonNegativeLong(), columns, rows);
return new SqlResponse(randomCursor(), randomNonNegativeLong(), columnCount, columns, rows);
}
@Override
@ -53,4 +62,5 @@ public class SqlResponseTests extends AbstractStreamableTestCase<SqlResponse> {
return new SqlResponse();
}
// NOCOMMIT add tests for toXcontent
}

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.type;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.type.DataTypeConversion.Conversion;
public class DataTypeConversionTests extends ESTestCase {
public void testConversionToString() {
Conversion conversion = DataTypeConversion.conversionFor(new DoubleType(true), KeywordType.DEFAULT);
assertNull(conversion.convert(null));
assertEquals("10.0", conversion.convert(10.0));
conversion = DataTypeConversion.conversionFor(new DateType(true), KeywordType.DEFAULT);
assertNull(conversion.convert(null));
assertEquals("1970-01-01T00:00:00Z", conversion.convert(0));
}
/**
* Test conversion to a date or long. These are almost the same.
*/
public void testConversionToLongOrDate() {
DataType to = randomBoolean() ? new LongType(true) : new DateType(true);
{
Conversion conversion = DataTypeConversion.conversionFor(new DoubleType(true), to);
assertNull(conversion.convert(null));
assertEquals(10L, conversion.convert(10.0));
assertEquals(10L, conversion.convert(10.1));
assertEquals(11L, conversion.convert(10.6));
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> conversion.convert(Double.MAX_VALUE));
assertEquals("[" + Double.MAX_VALUE + "] out of [Long] range", e.getMessage());
}
{
Conversion conversion = DataTypeConversion.conversionFor(new IntegerType(true), to);
assertNull(conversion.convert(null));
assertEquals(10L, conversion.convert(10));
assertEquals(-134L, conversion.convert(-134));
}
{
Conversion conversion = DataTypeConversion.conversionFor(new BooleanType(true), to);
assertNull(conversion.convert(null));
assertEquals(1, conversion.convert(true));
assertEquals(0, conversion.convert(false));
}
Conversion conversion = DataTypeConversion.conversionFor(KeywordType.DEFAULT, to);
assertNull(conversion.convert(null));
if (to instanceof LongType) {
assertEquals(1L, conversion.convert("1"));
assertEquals(0L, conversion.convert("-0"));
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> conversion.convert("0xff"));
assertEquals("cannot cast [0xff] to [Long]", e.getMessage());
} else {
// TODO we'd like to be able to optionally parse millis here I think....
assertEquals(1000L, conversion.convert("1970-01-01T00:00:01Z"));
assertEquals(1483228800000L, conversion.convert("2017-01-01T00:00:00Z"));
assertEquals(18000000L, conversion.convert("1970-01-01T00:00:00-05:00"));
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> conversion.convert("0xff"));
assertEquals("cannot cast [0xff] to [Date]: Invalid format: \"0xff\" is malformed at \"xff\"", e.getMessage());
}
}
public void testConversionToDouble() {
{
Conversion conversion = DataTypeConversion.conversionFor(new FloatType(true), new DoubleType(true));
assertNull(conversion.convert(null));
assertEquals(10.0, (double) conversion.convert(10.0f), 0.00001);
assertEquals(10.1, (double) conversion.convert(10.1f), 0.00001);
assertEquals(10.6, (double) conversion.convert(10.6f), 0.00001);
}
{
Conversion conversion = DataTypeConversion.conversionFor(new IntegerType(true), new DoubleType(true));
assertNull(conversion.convert(null));
assertEquals(10.0, (double) conversion.convert(10), 0.00001);
assertEquals(-134.0, (double) conversion.convert(-134), 0.00001);
}
{
Conversion conversion = DataTypeConversion.conversionFor(new BooleanType(true), new DoubleType(true));
assertNull(conversion.convert(null));
assertEquals(1.0, (double) conversion.convert(true), 0);
assertEquals(0.0, (double) conversion.convert(false), 0);
}
{
Conversion conversion = DataTypeConversion.conversionFor(KeywordType.DEFAULT, new DoubleType(true));
assertNull(conversion.convert(null));
assertEquals(1.0, (double) conversion.convert("1"), 0);
assertEquals(0.0, (double) conversion.convert("-0"), 0);
assertEquals(12.776, (double) conversion.convert("12.776"), 0.00001);
Exception e = expectThrows(SqlIllegalArgumentException.class, () -> conversion.convert("0xff"));
assertEquals("cannot cast [0xff] to [Double]", e.getMessage());
}
}
}