Fix SYS COLUMNS schema in ODBC mode (#59513) (#60418)

* Fix SYS COLUMNS schema in ODBC mode (#59513)

* Fix SYS COLUMNS schema in ODBC mode

This fixes a regression when certain ODBC-specific columns that need to
be of the short type were returned as the integer type.

This also fixes the stubbing for the *-indices SYS COLUMN commands.

(cherry picked from commit 96d89dc9b1fd731e736ef804a16bd05496c1dea6)

* Java8 fix: avoid diamond notation in test.

Qualify anonymous class in test.
This commit is contained in:
Bogdan Pintea 2020-07-29 21:19:32 +02:00 committed by GitHub
parent 4c771485f6
commit 30610d962a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 19 deletions

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.ListCursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
@ -133,7 +134,7 @@ public class SysColumns extends Command {
fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(of(session, rows));
listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
}, listener::onFailure));
}
// otherwise use a merged mapping
@ -146,7 +147,7 @@ public class SysColumns extends Command {
fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(of(session, rows));
listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
}, listener::onFailure));
}
}

View File

@ -21,14 +21,20 @@ import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.util.DateUtils;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
@ -449,15 +455,15 @@ public class SysColumnsTests extends ESTestCase {
public void testSysColumnsNoArg() throws Exception {
executeCommand("SYS COLUMNS", emptyList(), r -> {
assertEquals(13, r.size());
assertEquals(15, r.size());
assertEquals(CLUSTER_NAME, r.column(0));
// no index specified
assertEquals("", r.column(2));
assertEquals("test", r.column(2));
assertEquals("bool", r.column(3));
r.advanceRow();
assertEquals(CLUSTER_NAME, r.column(0));
// no index specified
assertEquals("", r.column(2));
assertEquals("test", r.column(2));
assertEquals("int", r.column(3));
}, mapping);
}
@ -501,33 +507,87 @@ public class SysColumnsTests extends ESTestCase {
}, mapping);
}
@SuppressWarnings({ "unchecked" })
private void executeCommand(String sql, List<SqlTypedParamValue> params, Consumer<SchemaRowSet> consumer, Map<String, EsField> mapping)
throws Exception {
Tuple<Command, SqlSession> tuple = sql(sql, params, mapping);
public void testSysColumnsTypesInOdbcMode() {
executeCommand("SYS COLUMNS", emptyList(), Mode.ODBC, SysColumnsTests::checkOdbcShortTypes, mapping);
executeCommand("SYS COLUMNS TABLE LIKE 'test'", emptyList(), Mode.ODBC, SysColumnsTests::checkOdbcShortTypes, mapping);
}
IndexResolver resolver = tuple.v2().indexResolver();
public void testSysColumnsPaginationInOdbcMode() {
assertEquals(15, executeCommandInOdbcModeAndCountRows("SYS COLUMNS"));
assertEquals(15, executeCommandInOdbcModeAndCountRows("SYS COLUMNS TABLE LIKE 'test'"));
}
EsIndex test = new EsIndex("test", mapping);
private int executeCommandInOdbcModeAndCountRows(String sql) {
final SqlConfiguration config = new SqlConfiguration(DateUtils.UTC, randomIntBetween(1, 15), Protocol.REQUEST_TIMEOUT,
Protocol.PAGE_TIMEOUT, null, Mode.ODBC, null, null, null, false, false);
Tuple<Command, SqlSession> tuple = sql(sql, emptyList(), config, mapping);
doAnswer(invocation -> {
((ActionListener<IndexResolution>) invocation.getArguments()[3]).onResponse(IndexResolution.valid(test));
return Void.TYPE;
}).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
int[] rowCount = {0};
tuple.v1().execute(tuple.v2(), new ActionListener<Cursor.Page>() {
@Override
public void onResponse(Cursor.Page page) {
Cursor c = page.next();
rowCount[0] += page.rowSet().size();
if (c != Cursor.EMPTY) {
c.nextPage(config, null, null, this);
}
}
@Override
public void onFailure(Exception e) {
fail(e.getMessage());
}
});
return rowCount[0];
}
private void executeCommand(String sql, List<SqlTypedParamValue> params, Mode mode, Consumer<SchemaRowSet> consumer,
Map<String, EsField> mapping) {
final SqlConfiguration config = new SqlConfiguration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT,
Protocol.PAGE_TIMEOUT, null, mode, null, null, null, false, false);
Tuple<Command, SqlSession> tuple = sql(sql, params, config, mapping);
tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage())));
}
private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {
private void executeCommand(String sql, List<SqlTypedParamValue> params, Consumer<SchemaRowSet> consumer, Map<String, EsField> mapping)
throws Exception {
executeCommand(sql, params, Mode.PLAIN, consumer, mapping);
}
@SuppressWarnings({ "unchecked" })
private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, SqlConfiguration config,
Map<String, EsField> mapping) {
EsIndex test = new EsIndex("test", mapping);
Analyzer analyzer = new Analyzer(SqlTestUtils.TEST_CFG, new FunctionRegistry(), IndexResolution.valid(test),
new Verifier(new Metrics()));
Analyzer analyzer = new Analyzer(config, new FunctionRegistry(), IndexResolution.valid(test), new Verifier(new Metrics()));
Command cmd = (Command) analyzer.analyze(parser.createStatement(sql, params, UTC), true);
IndexResolver resolver = mock(IndexResolver.class);
when(resolver.clusterName()).thenReturn(CLUSTER_NAME);
doAnswer(invocation -> {
((ActionListener<IndexResolution>) invocation.getArguments()[3]).onResponse(IndexResolution.valid(test));
return Void.TYPE;
}).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
doAnswer(invocation -> {
((ActionListener<List<EsIndex>>) invocation.getArguments()[3]).onResponse(singletonList(test));
return Void.TYPE;
}).when(resolver).resolveAsSeparateMappings(any(), any(), anyBoolean(), any());
SqlSession session = new SqlSession(SqlTestUtils.TEST_CFG, null, null, resolver, null, null, null, null, null);
SqlSession session = new SqlSession(config, null, null, resolver, null, null, null, null, null);
return new Tuple<>(cmd, session);
}
private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {
return sql(sql, params, SqlTestUtils.TEST_CFG, mapping);
}
private static void checkOdbcShortTypes(SchemaRowSet r) {
assertEquals(15, r.size());
// https://github.com/elastic/elasticsearch/issues/35376
// cols that need to be of short type: DATA_TYPE, DECIMAL_DIGITS, NUM_PREC_RADIX, NULLABLE, SQL_DATA_TYPE, SQL_DATETIME_SUB
List<Integer> cols = Arrays.asList(4, 8, 9, 10, 13, 14);
for (Integer i: cols) {
assertEquals("short", r.schema().get(i).type().name().toLowerCase(Locale.ROOT));
}
}
}