SQL: Break TextFormatter/Cursor dependency (#45613)

Improve the initialization and state passing of TextFormatter in CLI
and TEXT mode by leveraging the Page listener hook. Additionally
simplify the code inside RestSqlQueryAction.

(cherry picked from commit a56db2fa119cf9e8748723e19f1fc9f6a8afe5fc)
This commit is contained in:
Costin Leau 2019-08-16 17:49:25 +03:00 committed by Costin Leau
parent 96883dd028
commit 1cd58c8ea8
7 changed files with 81 additions and 80 deletions

View File

@ -86,6 +86,10 @@ public class SqlQueryResponse extends ActionResponse implements ToXContentObject
return cursor;
}
public boolean hasCursor() {
return StringUtils.EMPTY.equals(cursor) == false;
}
public long size() {
return rows.size();
}

View File

@ -7,7 +7,6 @@
package org.elasticsearch.xpack.sql.plugin;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -24,8 +23,6 @@ import org.elasticsearch.xpack.sql.action.SqlQueryAction;
import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursors;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -89,22 +86,9 @@ public class RestSqlQueryAction extends BaseRestHandler {
* which we turn into a 400 error.
*/
XContentType xContentType = accept == null ? XContentType.JSON : XContentType.fromMediaTypeOrFormat(accept);
if (xContentType != null) {
return channel -> client.execute(SqlQueryAction.INSTANCE, sqlRequest, new RestResponseListener<SqlQueryResponse>(channel) {
@Override
public RestResponse buildResponse(SqlQueryResponse response) throws Exception {
XContentBuilder builder = channel.newBuilder(request.getXContentType(), xContentType, true);
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
}
TextFormat textFormat = xContentType == null ? TextFormat.fromMediaTypeOrFormat(accept) : null;
TextFormat textFormat = TextFormat.fromMediaTypeOrFormat(accept);
// if we reached this point, the format to be used can be one of TXT, CSV or TSV
// which won't work in a columnar fashion
if (sqlRequest.columnar()) {
if (xContentType == null && sqlRequest.columnar()) {
throw new IllegalArgumentException("Invalid use of [columnar] argument: cannot be used in combination with "
+ "txt, csv or tsv formats");
}
@ -113,19 +97,27 @@ public class RestSqlQueryAction extends BaseRestHandler {
return channel -> client.execute(SqlQueryAction.INSTANCE, sqlRequest, new RestResponseListener<SqlQueryResponse>(channel) {
@Override
public RestResponse buildResponse(SqlQueryResponse response) throws Exception {
Cursor cursor = Cursors.decodeFromString(sqlRequest.cursor());
final String data = textFormat.format(cursor, request, response);
RestResponse restResponse;
RestResponse restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
data.getBytes(StandardCharsets.UTF_8));
Cursor responseCursor = textFormat.wrapCursor(cursor, response);
if (responseCursor != Cursor.EMPTY) {
restResponse.addHeader("Cursor", Cursors.encodeToString(Version.CURRENT, responseCursor));
// XContent branch
if (xContentType != null) {
XContentBuilder builder = channel.newBuilder(request.getXContentType(), xContentType, true);
response.toXContent(builder, request);
restResponse = new BytesRestResponse(RestStatus.OK, builder);
}
restResponse.addHeader("Took-nanos", Long.toString(System.nanoTime() - startNanos));
// TextFormat
else {
final String data = textFormat.format(request, response);
restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
data.getBytes(StandardCharsets.UTF_8));
if (response.hasCursor()) {
restResponse.addHeader("Cursor", response.cursor());
}
}
restResponse.addHeader("Took-nanos", Long.toString(System.nanoTime() - startNanos));
return restResponse;
}
});

View File

@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.action.BasicFormatter;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
@ -26,9 +28,6 @@ import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEX
/**
* Templating class for displaying SQL responses in text formats.
*/
// TODO are we sure toString is correct here? What about dates that come back as longs.
// Tracked by https://github.com/elastic/x-pack-elasticsearch/issues/3081
enum TextFormat {
/**
@ -41,22 +40,38 @@ enum TextFormat {
*/
PLAIN_TEXT() {
@Override
String format(Cursor cursor, RestRequest request, SqlQueryResponse response) {
final BasicFormatter formatter;
if (cursor instanceof TextFormatterCursor) {
formatter = ((TextFormatterCursor) cursor).getFormatter();
return formatter.formatWithoutHeader(response.rows());
} else {
String format(RestRequest request, SqlQueryResponse response) {
BasicFormatter formatter = null;
Cursor cursor = null;
// check if the cursor is already wrapped first
if (response.hasCursor()) {
cursor = Cursors.decodeFromString(response.cursor());
if (cursor instanceof TextFormatterCursor) {
formatter = ((TextFormatterCursor) cursor).getFormatter();
}
}
// if there are headers available, it means it's the first request
// so initialize the underlying formatter and wrap it in the cursor
if (response.columns() != null) {
formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
// if there's a cursor, wrap the formatter in it
if (cursor != null) {
response.cursor(Cursors.encodeToString(Version.CURRENT, new TextFormatterCursor(cursor, formatter)));
}
// format with header
return formatter.formatWithHeader(response.columns(), response.rows());
}
}
@Override
Cursor wrapCursor(Cursor oldCursor, SqlQueryResponse response) {
BasicFormatter formatter = (oldCursor instanceof TextFormatterCursor) ?
((TextFormatterCursor) oldCursor).getFormatter() : new BasicFormatter(response.columns(), response.rows(), TEXT);
return TextFormatterCursor.wrap(super.wrapCursor(oldCursor, response), formatter);
else {
// should be initialized (wrapped by the cursor)
if (formatter != null) {
// format without header
return formatter.formatWithoutHeader(response.rows());
}
}
// if this code is reached, it means it's a next page without cursor wrapping
throw new SqlIllegalArgumentException("Cannot find text formatter - this is likely a bug");
}
@Override
@ -219,12 +234,11 @@ enum TextFormat {
};
String format(Cursor cursor, RestRequest request, SqlQueryResponse response) {
String format(RestRequest request, SqlQueryResponse response) {
StringBuilder sb = new StringBuilder();
boolean header = hasHeader(request);
if (header && (cursor == null || cursor == Cursor.EMPTY)) {
// if the header is requested (and the column info is present - namely it's the first page) return the info
if (hasHeader(request) && response.columns() != null) {
row(sb, response.columns(), ColumnInfo::name);
}
@ -239,10 +253,6 @@ enum TextFormat {
return true;
}
Cursor wrapCursor(Cursor oldCursor, SqlQueryResponse response) {
return Cursors.decodeFromString(response.cursor());
}
static TextFormat fromMediaTypeOrFormat(String accept) {
for (TextFormat text : values()) {
String contentType = text.contentType();

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ActionListener.wrap;
/**
* The cursor that wraps all necessary information for textual representation of the result table
*/
@ -26,18 +27,7 @@ public class TextFormatterCursor implements Cursor {
private final Cursor delegate;
private final BasicFormatter formatter;
/**
* If the newCursor is empty, returns an empty cursor. Otherwise, creates a new
* TextFormatterCursor that wraps the newCursor.
*/
public static Cursor wrap(Cursor newCursor, BasicFormatter formatter) {
if (newCursor == EMPTY) {
return EMPTY;
}
return new TextFormatterCursor(newCursor, formatter);
}
private TextFormatterCursor(Cursor delegate, BasicFormatter formatter) {
TextFormatterCursor(Cursor delegate, BasicFormatter formatter) {
this.delegate = delegate;
this.formatter = formatter;
}
@ -59,7 +49,12 @@ public class TextFormatterCursor implements Cursor {
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
delegate.nextPage(cfg, client, registry, listener);
// keep wrapping the text formatter
delegate.nextPage(cfg, client, registry,
wrap(p -> {
Cursor next = p.next();
listener.onResponse(next == Cursor.EMPTY ? p : new Page(p.rowSet(), new TextFormatterCursor(next, formatter)));
}, listener::onFailure));
}
@Override

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
import org.elasticsearch.xpack.sql.expression.literal.Literals;
import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
@ -66,7 +67,7 @@ public final class Cursors {
*/
public static String encodeToString(Version version, Cursor info) {
if (info == Cursor.EMPTY) {
return "";
return StringUtils.EMPTY;
}
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) {

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -15,7 +15,8 @@ import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.TestUtils;
import org.elasticsearch.xpack.sql.action.BasicFormatter;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests;
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Cursor;
@ -29,8 +30,6 @@ import java.util.List;
import java.util.function.Supplier;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.CLI;
import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEXT;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -82,8 +81,8 @@ public class CursorTests extends ESTestCase {
() -> {
SqlQueryResponse response = createRandomSqlResponse();
if (response.columns() != null && response.rows() != null) {
return TextFormatterCursor.wrap(ScrollCursorTests.randomScrollCursor(),
new BasicFormatter(response.columns(), response.rows(), CLI));
return new TextFormatterCursor(ScrollCursorTests.randomScrollCursor(),
new BasicFormatter(response.columns(), response.rows(), BasicFormatter.FormatOption.CLI));
} else {
return ScrollCursorTests.randomScrollCursor();
}
@ -91,8 +90,8 @@ public class CursorTests extends ESTestCase {
() -> {
SqlQueryResponse response = createRandomSqlResponse();
if (response.columns() != null && response.rows() != null) {
return TextFormatterCursor.wrap(ScrollCursorTests.randomScrollCursor(),
new BasicFormatter(response.columns(), response.rows(), TEXT));
return new TextFormatterCursor(ScrollCursorTests.randomScrollCursor(),
new BasicFormatter(response.columns(), response.rows(), BasicFormatter.FormatOption.TEXT));
} else {
return ScrollCursorTests.randomScrollCursor();
}

View File

@ -76,17 +76,17 @@ public class TextFormatTests extends ESTestCase {
}
public void testCsvFormatWithEmptyData() {
String text = CSV.format(null, req(), emptyData());
String text = CSV.format(req(), emptyData());
assertEquals("name\r\n", text);
}
public void testTsvFormatWithEmptyData() {
String text = TSV.format(null, req(), emptyData());
String text = TSV.format(req(), emptyData());
assertEquals("name\n", text);
}
public void testCsvFormatWithRegularData() {
String text = CSV.format(null, req(), regularData());
String text = CSV.format(req(), regularData());
assertEquals("string,number\r\n" +
"Along The River Bank,708\r\n" +
"Mind Train,280\r\n",
@ -94,7 +94,7 @@ public class TextFormatTests extends ESTestCase {
}
public void testTsvFormatWithRegularData() {
String text = TSV.format(null, req(), regularData());
String text = TSV.format(req(), regularData());
assertEquals("string\tnumber\n" +
"Along The River Bank\t708\n" +
"Mind Train\t280\n",
@ -102,7 +102,7 @@ public class TextFormatTests extends ESTestCase {
}
public void testCsvFormatWithEscapedData() {
String text = CSV.format(null, req(), escapedData());
String text = CSV.format(req(), escapedData());
assertEquals("first,\"\"\"special\"\"\"\r\n" +
"normal,\"\"\"quo\"\"ted\"\",\n\"\r\n" +
"commas,\"a,b,c,\n,d,e,\t\n\"\r\n"
@ -110,7 +110,7 @@ public class TextFormatTests extends ESTestCase {
}
public void testTsvFormatWithEscapedData() {
String text = TSV.format(null, req(), escapedData());
String text = TSV.format(req(), escapedData());
assertEquals("first\t\"special\"\n" +
"normal\t\"quo\"ted\",\\n\n" +
"commas\ta,b,c,\\n,d,e,\\t\\n\n"