Backport changes from stand-alone repo:

use Joda ReadableTime interface
add support for GZIP
add Ansi colors in the CLI plus logo
remove net-client dependency from proto libs (avoids classpath leaks)

Original commit: elastic/x-pack-elasticsearch@0ce7c9690d
This commit is contained in:
Costin Leau 2017-06-29 00:22:32 +03:00
parent 2cd7f1bfe3
commit a9a2d14193
50 changed files with 890 additions and 382 deletions

View File

@ -10,7 +10,7 @@ import org.elasticsearch.xpack.sql.expression.function.scalar.ColumnsProcessor;
class ProcessingHitExtractor implements HitExtractor {
private final HitExtractor delegate;
final HitExtractor delegate;
private final ColumnsProcessor processor;
ProcessingHitExtractor(HitExtractor delegate, ColumnsProcessor processor) {

View File

@ -5,12 +5,6 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@ -19,6 +13,12 @@ import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
//
// Since the results might contain nested docs, the iteration is similar to that of Aggregation
// namely it discovers the nested docs and then, for iteration, increments the deepest level first
@ -48,9 +48,9 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
String innerH = null;
for (HitExtractor ex : exts) {
if (ex instanceof InnerHitExtractor) {
innerH = ((InnerHitExtractor) ex).parent();
innerHits.add(innerH);
InnerHitExtractor ie = getInnerHitExtractor(ex);
if (ie != null) {
innerH = ie.parent();
}
}
@ -82,7 +82,7 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
@Override
protected Object getColumn(int column) {
HitExtractor e = extractors.get(column);
int extractorLevel = e instanceof InnerHitExtractor ? 1 : 0;
int extractorLevel = isInnerHitExtractor(e) ? 1 : 0;
SearchHit hit = null;
SearchHit[] sh = hits;
@ -98,6 +98,20 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
return e.get(hit);
}
private boolean isInnerHitExtractor(HitExtractor he) {
return getInnerHitExtractor(he) != null;
}
private InnerHitExtractor getInnerHitExtractor(HitExtractor he) {
if (he instanceof ProcessingHitExtractor) {
return getInnerHitExtractor(((ProcessingHitExtractor) he).delegate);
}
if (he instanceof InnerHitExtractor) {
return (InnerHitExtractor) he;
}
return null;
}
@Override
protected boolean doHasCurrent() {
return row < size();

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import java.util.Locale;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.Expressions;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
@ -19,9 +17,11 @@ import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableDateTime;
import java.util.Locale;
import static java.lang.String.format;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ParamsBuilder.paramsBuilder;
import static org.elasticsearch.xpack.sql.expression.function.scalar.script.ScriptTemplate.formatTemplate;
@ -66,14 +66,14 @@ public abstract class DateTimeFunction extends ScalarFunction {
@Override
public ColumnsProcessor asProcessor() {
return l -> {
DateTime dt = null;
ReadableDateTime dt = null;
// most dates are returned as long
if (l instanceof Long) {
dt = new DateTime((Long) l, DateTimeZone.UTC);
}
// but date histogram returns the keys already as DateTime on UTC
else {
dt = (DateTime) l;
dt = (ReadableDateTime) l;
}
return Integer.valueOf(extract(dt));
};
@ -84,7 +84,7 @@ public abstract class DateTimeFunction extends ScalarFunction {
return DataTypes.INTEGER;
}
protected abstract int extract(DateTime dt);
protected abstract int extract(ReadableDateTime dt);
// used for aggregration (date histogram)
public abstract String interval();

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class DayOfMonth extends DateTimeFunction {
@ -26,7 +26,7 @@ public class DayOfMonth extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getDayOfMonth();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class DayOfWeek extends DateTimeFunction {
@ -26,7 +26,7 @@ public class DayOfWeek extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getDayOfWeek();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class DayOfYear extends DateTimeFunction {
@ -26,7 +26,7 @@ public class DayOfYear extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getDayOfYear();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class HourOfDay extends DateTimeFunction {
@ -26,7 +26,7 @@ public class HourOfDay extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getHourOfDay();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class MinuteOfDay extends DateTimeFunction {
@ -26,7 +26,7 @@ public class MinuteOfDay extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getMinuteOfDay();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class MinuteOfHour extends DateTimeFunction {
@ -26,7 +26,7 @@ public class MinuteOfHour extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getMinuteOfHour();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class MonthOfYear extends DateTimeFunction {
@ -26,7 +26,7 @@ public class MonthOfYear extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getMonthOfYear();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class SecondOfMinute extends DateTimeFunction {
@ -26,7 +26,7 @@ public class SecondOfMinute extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getSecondOfMinute();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class WeekOfWeekYear extends DateTimeFunction {
@ -26,7 +26,7 @@ public class WeekOfWeekYear extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getWeekOfWeekyear();
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.joda.time.DateTime;
import org.joda.time.ReadableDateTime;
public class Year extends DateTimeFunction {
@ -26,7 +26,7 @@ public class Year extends DateTimeFunction {
}
@Override
protected int extract(DateTime dt) {
protected int extract(ReadableDateTime dt) {
return dt.getYear();
}

View File

@ -26,10 +26,10 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.TransportCliAction;
import org.elasticsearch.xpack.sql.plugin.cli.http.HttpCliAction;
import org.elasticsearch.xpack.sql.plugin.cli.http.CliHttpHandler;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.TransportJdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.http.HttpJdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.http.JdbcHttpHandler;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.rest.RestSqlAction;
@ -56,8 +56,8 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(new RestSqlAction(settings, restController),
new HttpCliAction(settings, restController),
new HttpJdbcAction(settings, restController));
new CliHttpHandler(settings, restController),
new JdbcHttpHandler(settings, restController));
}
@Override

View File

@ -34,10 +34,9 @@ public class TransportCliAction extends HandledTransportAction<CliRequest, CliRe
super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
// lazy init of the resolver
// NOCOMMIT indexNameExpressionResolver should be available some other way
// NOCOMMIT indexNameExpressionResolver should be available some other way; so should the localNode name :)
((EsCatalog) planExecutor.catalog()).setIndexNameExpressionResolver(indexNameExpressionResolver);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(),
Version.CURRENT, Build.CURRENT);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
}
@Override

View File

@ -17,10 +17,10 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliRequest;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliResponse;
import org.elasticsearch.xpack.sql.util.StringUtils;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE;
@ -29,9 +29,9 @@ import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class HttpCliAction extends BaseRestHandler {
public class CliHttpHandler extends BaseRestHandler {
public HttpCliAction(Settings settings, RestController controller) {
public CliHttpHandler(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_cli", this);
}

View File

@ -5,16 +5,26 @@
*/
package org.elasticsearch.xpack.sql.plugin.cli.http;
import java.io.DataOutputStream;
import java.io.IOException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public abstract class CliServerProtoUtils {
public static BytesReference write(Response response) throws IOException {
@ -35,4 +45,29 @@ public abstract class CliServerProtoUtils {
return array.bytes();
}
}
public static Response exception(Throwable cause, Action action) {
String message = EMPTY;
String cs = EMPTY;
if (cause != null) {
if (Strings.hasText(cause.getMessage())) {
message = cause.getMessage();
}
cs = cause.getClass().getName();
}
if (expectedException(cause)) {
return new ExceptionResponse(action, message, cs);
}
else {
// TODO: might want to 'massage' this
StringWriter sw = new StringWriter();
cause.printStackTrace(new PrintWriter(sw));
return new ErrorResponse(action, message, cs, sw.toString());
}
}
private static boolean expectedException(Throwable cause) {
return (cause instanceof SqlException || cause instanceof ResourceNotFoundException);
}
}

View File

@ -36,7 +36,7 @@ public abstract class CliUtils { // TODO made public so it could be shared with
AtomicBoolean firstRun = new AtomicBoolean(true);
// take a look at the first values
cursor.forEachSet(rowSet -> {
for (boolean shouldRun = rowSet.hasCurrent(); shouldRun; shouldRun = rowSet.advance()) {
for (boolean shouldRun = rowSet.hasCurrentRow(); shouldRun; shouldRun = rowSet.advanceRow()) {
for (int column = 0; column < rowSet.rowSize(); column++) {
if (column > 0) {
sb.append("|");

View File

@ -6,30 +6,25 @@
package org.elasticsearch.xpack.sql.plugin.cli.server;
import org.elasticsearch.Build;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Request;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.plugin.cli.http.CliServerProtoUtils;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.function.Supplier;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.sql.net.client.util.StringUtils.EMPTY;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public class CliServer {
@ -51,8 +46,11 @@ public class CliServer {
else if (req instanceof CommandRequest) {
command((CommandRequest) req, listener);
}
else {
listener.onResponse(new ExceptionResponse(req.action, "Invalid requested", null));
}
} catch (Exception ex) {
listener.onResponse(exception(ex, req.action));
listener.onResponse(CliServerProtoUtils.exception(ex, req.action));
}
}
@ -73,35 +71,11 @@ public class CliServer {
listener.onResponse(new CommandResponse(start, stop, requestId, c));
},
ex -> exception(ex, req.action)));
ex -> listener.onResponse(CliServerProtoUtils.exception(ex, req.action))));
}
public void queryPage(QueryPageRequest req, ActionListener<Response> listener) {
throw new UnsupportedOperationException();
}
private static Response exception(Throwable cause, Action action) {
String message = EMPTY;
String cs = EMPTY;
if (cause != null) {
if (StringUtils.hasText(cause.getMessage())) {
message = cause.getMessage();
}
cs = cause.getClass().getName();
}
if (expectedException(cause)) {
return new ExceptionResponse(action, message, cs);
}
else {
// TODO: might want to 'massage' this
StringWriter sw = new StringWriter();
cause.printStackTrace(new PrintWriter(sw));
return new ErrorResponse(action, message, cs, sw.toString());
}
}
private static boolean expectedException(Throwable cause) {
return (cause instanceof SqlException || cause instanceof ResourceNotFoundException);
}
}

View File

@ -14,11 +14,11 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcRequest;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcResponse;
import org.elasticsearch.xpack.sql.plugin.jdbc.server.JdbcServerProtoUtils;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.DataInputStream;
import java.io.IOException;
@ -30,9 +30,9 @@ import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class HttpJdbcAction extends BaseRestHandler { // NOCOMMIT these are call RestJdbcAction even if it isn't REST.
public class JdbcHttpHandler extends BaseRestHandler { // NOCOMMIT these are call RestJdbcAction even if it isn't REST.
public HttpJdbcAction(Settings settings, RestController controller) {
public JdbcHttpHandler(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_jdbc", this);
}

View File

@ -6,16 +6,13 @@
package org.elasticsearch.xpack.sql.plugin.jdbc.server;
import org.elasticsearch.Build;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.analysis.catalog.EsType;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ColumnInfo;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnInfo;
@ -23,30 +20,24 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.SqlExceptionType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Request;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Response;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.sql.net.client.util.StringUtils.EMPTY;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public class JdbcServer {
@ -75,35 +66,37 @@ public class JdbcServer {
queryInit((QueryInitRequest) req, listener);
}
} catch (Exception ex) {
listener.onResponse(exception(ex, req.action));
listener.onResponse(JdbcServerProtoUtils.exception(ex, req.action));
}
}
public InfoResponse info(InfoRequest req) {
return infoResponse.get();
}
public MetaTableResponse metaTable(MetaTableRequest req) {
Collection<EsType> types = executor.catalog().listTypes(req.index, req.type);
String indexPattern = Strings.hasText(req.index) ? StringUtils.jdbcToEsPattern(req.index) : "*";
Collection<EsType> types = executor.catalog().listTypes(indexPattern, req.type);
return new MetaTableResponse(types.stream()
.map(t -> t.index() + "." + t.name())
.collect(toList()));
}
public MetaColumnResponse metaColumn(MetaColumnRequest req) {
Collection<EsType> types = executor.catalog().listTypes(req.index, req.type);
String indexPattern = Strings.hasText(req.index) ? StringUtils.jdbcToEsPattern(req.index) : "*";
Pattern pat = null;
if (StringUtils.hasText(req.column)) {
pat = Pattern.compile(req.column);
}
Collection<EsType> types = executor.catalog().listTypes(indexPattern, req.type);
Pattern columnMatcher = Strings.hasText(req.column) ? StringUtils.likeRegex(req.column) : null;
List<MetaColumnInfo> resp = new ArrayList<>();
for (EsType type : types) {
int pos = 0;
for (Entry<String, DataType> entry : type.mapping().entrySet()) {
pos++;
if (pat == null || pat.matcher(entry.getKey()).matches()) {
if (columnMatcher == null || columnMatcher.matcher(entry.getKey()).matches()) {
String name = entry.getKey();
String table = type.index() + "." + type.name();
int tp = entry.getValue().sqlType().getVendorTypeNumber().intValue();
@ -116,64 +109,26 @@ public class JdbcServer {
return new MetaColumnResponse(resp);
}
public void queryInit(QueryInitRequest req, ActionListener<Response> listener) {
final long start = System.currentTimeMillis();
executor.sql(req.query, wrap(
c -> {
long stop = System.currentTimeMillis();
String requestId = EMPTY;
if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) {
requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId());
}
List<ColumnInfo> list = c.schema().stream()
.map(e -> new ColumnInfo(e.name(), e.type().sqlType().getVendorTypeNumber().intValue(), EMPTY, EMPTY, EMPTY, EMPTY))
.collect(toList());
listener.onResponse(new QueryInitResponse(start, stop, requestId, list, c));
},
ex -> exception(ex, req.action)));
executor.sql(req.query, wrap(c -> {
long stop = System.currentTimeMillis();
String requestId = EMPTY;
if (c.hasNextSet() && c instanceof SearchHitRowSetCursor) {
requestId = StringUtils.nullAsEmpty(((SearchHitRowSetCursor) c).scrollId());
}
List<ColumnInfo> list = c.schema().stream()
.map(e -> new ColumnInfo(e.name(), e.type().sqlType().getVendorTypeNumber().intValue(), EMPTY, EMPTY, EMPTY, EMPTY))
.collect(toList());
listener.onResponse(new QueryInitResponse(start, stop, requestId, list, c));
}, ex -> listener.onResponse(JdbcServerProtoUtils.exception(ex, req.action))));
}
public void queryPage(QueryPageRequest req, ActionListener<Response> listener) {
throw new UnsupportedOperationException();
}
private static Response exception(Throwable cause, Action action) {
SqlExceptionType sqlExceptionType = sqlExceptionType(cause);
String message = EMPTY;
String cs = EMPTY;
if (cause != null) {
if (StringUtils.hasText(cause.getMessage())) {
message = cause.getMessage();
}
cs = cause.getClass().getName();
}
if (sqlExceptionType != null) {
return new ExceptionResponse(action, message, cs, sqlExceptionType);
}
else {
// TODO: might want to 'massage' this
StringWriter sw = new StringWriter();
cause.printStackTrace(new PrintWriter(sw));
return new ErrorResponse(action, message, cs, sw.toString());
}
}
private static SqlExceptionType sqlExceptionType(Throwable cause) {
if (cause instanceof AnalysisException || cause instanceof ResourceNotFoundException) {
return SqlExceptionType.DATA;
}
if (cause instanceof ParsingException) {
return SqlExceptionType.SYNTAX;
}
if (cause instanceof TimeoutException) {
return SqlExceptionType.TIMEOUT;
}
return null;
}
}

View File

@ -8,15 +8,30 @@ package org.elasticsearch.xpack.sql.plugin.jdbc.server;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.Types;
import java.util.concurrent.TimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.xpack.sql.analysis.AnalysisException;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.DataResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.SqlExceptionType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Response;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.joda.time.ReadableDateTime;
import static org.elasticsearch.xpack.sql.util.StringUtils.EMPTY;
public abstract class JdbcServerProtoUtils {
@ -45,10 +60,52 @@ public abstract class JdbcServerProtoUtils {
.toArray();
// unroll forEach manually to avoid a Consumer + try/catch for each value...
for (boolean hasRows = rowSet.hasCurrent(); hasRows; hasRows = rowSet.advance()) {
for (boolean hasRows = rowSet.hasCurrentRow(); hasRows; hasRows = rowSet.advanceRow()) {
for (int i = 0; i < rowSet.rowSize(); i++) {
ProtoUtils.writeValue(out, rowSet.column(i), jdbcTypes[i]);
Object value = rowSet.column(i);
// unpack Joda classes on the server-side to not 'pollute' the common project and thus the client
if (jdbcTypes[i] == Types.TIMESTAMP_WITH_TIMEZONE && value instanceof ReadableDateTime) {
value = ((ReadableDateTime) value).getMillis();
}
ProtoUtils.writeValue(out, value, jdbcTypes[i]);
}
}
}
public static Response exception(Throwable cause, Action action) {
SqlExceptionType sqlExceptionType = sqlExceptionType(cause);
String message = EMPTY;
String cs = EMPTY;
if (cause != null) {
if (Strings.hasText(cause.getMessage())) {
message = cause.getMessage();
}
cs = cause.getClass().getName();
}
if (sqlExceptionType != null) {
return new ExceptionResponse(action, message, cs, sqlExceptionType);
}
else {
// TODO: might want to 'massage' this
StringWriter sw = new StringWriter();
cause.printStackTrace(new PrintWriter(sw));
return new ErrorResponse(action, message, cs, sw.toString());
}
}
private static SqlExceptionType sqlExceptionType(Throwable cause) {
if (cause instanceof AnalysisException || cause instanceof ResourceNotFoundException) {
return SqlExceptionType.DATA;
}
if (cause instanceof ParsingException) {
return SqlExceptionType.SYNTAX;
}
if (cause instanceof TimeoutException) {
return SqlExceptionType.TIMEOUT;
}
return null;
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.querydsl.agg;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.util.StringUtils;
// utility class around constructing and parsing ES specific aggregation path
// see org.elasticsearch.search.aggregations.support.AggregationPath
public abstract class AggPath {
private static final char PATH_DELIMITER_CHAR = '>';
private static final String PATH_DELIMITER = String.valueOf(PATH_DELIMITER_CHAR);
private static final String VALUE_DELIMITER = ".";
private static final String PATH_BUCKET_VALUE = "._key";
private static final String PATH_BUCKET_COUNT = "._count";
private static final String PATH_BUCKET_VALUE_FORMATTED = "._key_as_string";
private static final String PATH_DEFAULT_VALUE = ".value";
public static String bucketCount(String aggPath) {
return aggPath + PATH_BUCKET_COUNT;
}
public static String bucketValue(String aggPath) {
return aggPath + PATH_BUCKET_VALUE;
}
public static boolean isBucketValueFormatted(String path) {
return path.endsWith(PATH_BUCKET_VALUE_FORMATTED);
}
public static String bucketValueWithoutFormat(String path) {
return path.substring(0, path.length() - PATH_BUCKET_VALUE_FORMATTED.length());
}
public static String metricValue(String aggPath) {
return aggPath + PATH_DEFAULT_VALUE;
}
public static String metricValue(String aggPath, String valueName) {
return aggPath + VALUE_DELIMITER + valueName;
}
public static String path(String parent, String child) {
return (Strings.hasLength(parent) ? parent + PATH_DELIMITER : StringUtils.EMPTY) + child;
}
//
// The depth indicates the level of an agg excluding the root agg (because all aggs in SQL require a group). However all other bucket aggs are counted.
// Since the path does not indicate the type of agg used, to differentiate between metric properties and bucket properties, the bucket value is considered.
// This is needed since one might refer to the keys or count of a bucket path.
// As the opposite side there are metric aggs which have the same level as their parent (their nesting is an ES implementation detail)
// Default examples:
//
// agg1 = 0 ; agg1 = default/root group
// agg1>agg2._count = 1 ; ._count indicates agg2 is a bucket agg and thus it counted - agg1 (default group), depth=0, agg2 (bucketed), depth=1
// agg1>agg2>agg3.value = 1 ; agg3.value indicates a metric bucket thus only agg1 and agg2 are counted -> depth=2. In other words, agg3.value has the same depth as agg2._count
// agg1>agg2>agg3._count = 2 ; ._count indicates agg3 is a bucket agg, so count it for depth -> depth = 2
// agg1>agg2>agg3.sum = 1 ; .sum indicates agg3 is a metric agg, only agg1 and agg2 are bucket and with agg1 being the default group -> depth = 1
public static int depth(String path) {
int depth = countCharIn(path, PATH_DELIMITER_CHAR);
// a metric value always has .foo while a bucket prop with ._foo
int dot = path.lastIndexOf(".");
if (depth > 0 && dot > 0) {
String prop = path.substring(dot + 1);
if (!prop.startsWith("_")) {
return Math.max(0, depth - 1);
}
}
return depth;
}
private static int countCharIn(CharSequence sequence, char c) {
int count = 0;
for (int i = 0; i < sequence.length(); i++) {
if (c == sequence.charAt(i)) {
count++;
}
}
return count;
}
}

View File

@ -30,19 +30,19 @@ public abstract class AbstractRowSetCursor implements RowSetCursor {
public Object column(int index) {
Assert.isTrue(index >= 0, "Invalid index %d; needs to be positive", index);
Assert.isTrue(index < rowSize(), "Invalid index %d for row of size %d", index, rowSize());
Assert.isTrue(hasCurrent(), "RowSet contains no (more) entries; use hasCurrent() to check its status");
Assert.isTrue(hasCurrentRow(), "RowSet contains no (more) entries; use hasCurrent() to check its status");
return getColumn(index);
}
protected abstract Object getColumn(int column);
@Override
public boolean hasCurrent() {
public boolean hasCurrentRow() {
return terminated ? false : doHasCurrent();
}
@Override
public boolean advance() {
public boolean advanceRow() {
if (terminated) {
return false;
}
@ -94,7 +94,7 @@ public abstract class AbstractRowSetCursor implements RowSetCursor {
public String toString() {
StringBuilder sb = new StringBuilder();
if (hasCurrent()) {
if (hasCurrentRow()) {
for (int column = 0; column < size; column++) {
if (column > 0) {
sb.append("|");

View File

@ -17,16 +17,16 @@ import java.util.function.Consumer;
*/
public interface RowSet extends RowView {
boolean hasCurrent();
boolean hasCurrentRow();
boolean advance();
boolean advanceRow();
int size();
void reset();
default void forEachRow(Consumer<? super RowView> action) {
for (boolean hasRows = hasCurrent(); hasRows; hasRows = advance()) {
for (boolean hasRows = hasCurrentRow(); hasRows; hasRows = advanceRow()) {
action.accept(this);
}
}

View File

@ -5,12 +5,13 @@
*/
package org.elasticsearch.xpack.sql.util;
import org.elasticsearch.common.Strings;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import org.elasticsearch.common.Strings;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.joining;
@ -58,7 +59,6 @@ public abstract class StringUtils {
return strings.stream().collect(joining("."));
}
//CamelCase to camel_case
public static String camelCaseToUnderscore(String string) {
if (!Strings.hasText(string)) {
return EMPTY;
@ -69,17 +69,92 @@ public abstract class StringUtils {
boolean previousCharWasUp = false;
for (int i = 0; i < s.length(); i++) {
char ch = s.charAt(i);
if (Character.isUpperCase(ch)) {
if (i > 0 && !previousCharWasUp) {
sb.append("_");
if (Character.isAlphabetic(ch)) {
if (Character.isUpperCase(ch)) {
if (i > 0 && !previousCharWasUp) {
sb.append("_");
}
previousCharWasUp = true;
}
else {
previousCharWasUp = (ch == '_');
}
previousCharWasUp = true;
}
else {
previousCharWasUp = (ch == '_');
previousCharWasUp = true;
}
sb.append(ch);
}
return sb.toString().toUpperCase(Locale.ROOT);
}
public static String nullAsEmpty(String string) {
return string == null ? EMPTY : string;
}
// % -> *
// _ -> .
// consider \ as an escaping char
public static String sqlToJavaPattern(CharSequence sqlPattern, char escapeChar, boolean shouldEscape) {
StringBuilder regex = new StringBuilder(sqlPattern.length() + 4);
boolean escaped = false;
regex.append('^');
for (int i = 0; i < sqlPattern.length(); i++) {
char curr = sqlPattern.charAt(i);
if (shouldEscape && !escaped && (curr == escapeChar)) {
escaped = true;
}
else {
switch (curr) {
case '%':
regex.append(escaped ? "%" : ".*");
escaped = false;
break;
case '_':
regex.append(escaped ? "_" : ".");
escaped = false;
break;
default:
// escape special regex characters
switch (curr) {
case '\\':
case '^':
case '$':
case '.':
case '*':
case '?':
case '+':
case '|':
case '(':
case ')':
case '[':
case ']':
case '{':
case '}':
regex.append('\\');
}
regex.append(curr);
escaped = false;
}
}
}
regex.append('$');
return regex.toString();
}
//TODO: likely this needs to be changed to probably its own indexNameResolver
public static String jdbcToEsPattern(String sqlPattern) {
return Strings.hasText(sqlPattern) ? sqlPattern.replace('%', '*').replace('_', '*') : EMPTY;
}
public static String sqlToJavaPattern(CharSequence sqlPattern) {
return sqlToJavaPattern(sqlPattern, '\\', true);
}
public static Pattern likeRegex(String likePattern) {
return Pattern.compile(sqlToJavaPattern(likePattern, '\\', true));
}
}

View File

@ -3,11 +3,5 @@ apply plugin: 'elasticsearch.build'
description = 'Request and response objects shared by the cli and ' +
'its backend in :x-pack-elasticsearch:plugin'
dependencies {
compile project(':x-pack-elasticsearch:sql-clients:net-client')
}
dependencyLicenses {
mapping from: /net-client.*/, to: 'elasticsearch'
ignoreSha 'net-client'
}

View File

@ -10,18 +10,48 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
public class InfoRequest extends Request {
public InfoRequest() {
private static final String EMPTY = "";
public final String jvmVersion, jvmVendor, jvmClassPath, osName, osVersion;
public InfoRequest(Properties props) {
super(Action.INFO);
jvmVersion = props.getProperty("java.version", EMPTY);
jvmVendor = props.getProperty("java.vendor", EMPTY);
jvmClassPath = props.getProperty("java.class.path", EMPTY);
osName = props.getProperty("os.name", EMPTY);
osVersion = props.getProperty("os.version", EMPTY);
}
public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
super(Action.INFO);
this.jvmVersion = jvmVersion;
this.jvmVendor = jvmVendor;
this.jvmClassPath = jvmClassPath;
this.osName = osName;
this.osVersion = osVersion;
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(action.value());
out.writeUTF(jvmVersion);
out.writeUTF(jvmVendor);
out.writeUTF(jvmClassPath);
out.writeUTF(osName);
out.writeUTF(osVersion);
}
public static InfoRequest decode(DataInput in) throws IOException {
return new InfoRequest();
String jvmVersion = in.readUTF();
String jvmVendor = in.readUTF();
String jvmClassPath = in.readUTF();
String osName = in.readUTF();
String osVersion = in.readUTF();
return new InfoRequest(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}
}

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import org.elasticsearch.xpack.sql.net.client.util.ObjectUtils;
import static java.util.stream.Collectors.toMap;
//
// Basic tabular messaging for the CLI
@ -15,7 +17,6 @@ import org.elasticsearch.xpack.sql.net.client.util.ObjectUtils;
// The protocol is very similar (a subset) to the JDBC driver
//
// To simplify things, the protocol is NOT meant to be backwards compatible.
//NOCOMMIT the protocol kind of should be backwards compatible though....
//
public interface Proto {
@ -55,7 +56,8 @@ public interface Proto {
// stacktrace - string - exception stacktrace (should be massaged)
ERROR (0xF000000);
private static final Map<Integer, Status> MAP = ObjectUtils.mapEnum(Status.class, Status::value);
private static final Map<Integer, Status> MAP = Arrays.stream(Status.class.getEnumConstants())
.collect(toMap(Status::value, Function.identity()));
private final int value;
@ -134,7 +136,8 @@ public interface Proto {
COMMAND(0x10);
private static final Map<Integer, Action> MAP = ObjectUtils.mapEnum(Action.class, Action::value);
private static final Map<Integer, Action> MAP = Arrays.stream(Action.class.getEnumConstants())
.collect(toMap(Action::value, Function.identity()));
private final int value;

View File

@ -4,13 +4,16 @@ apply plugin: 'application'
description = 'Command line interface to Elasticsearch that speaks SQL'
dependencies {
compile "org.jline:jline:3.3.0"
compile "org.jline:jline:3.3.1"
compile project(':x-pack-elasticsearch:sql-clients:net-client')
compile project(':x-pack-elasticsearch:sql-clients:cli-proto')
testCompile project(":x-pack-elasticsearch:transport-client")
testCompile project(path: ':x-pack-elasticsearch:plugin', configuration: 'testArtifacts')
testCompile project(':x-pack-elasticsearch:sql-clients:test-utils')
runtime "org.fusesource.jansi:jansi:1.16"
runtime "org.elasticsearch:jna:4.4.0"
}
dependencyLicenses {

View File

@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.client.HttpCliClient;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.net.client.SuppressForbidden;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.Locale;
import java.util.Properties;
import org.elasticsearch.xpack.sql.cli.net.client.CliHttpClient;
import org.elasticsearch.xpack.sql.net.client.util.IOUtils;
import org.jline.keymap.BindingReader;
import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
@ -17,19 +20,15 @@ import org.jline.reader.LineReaderBuilder;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
import org.jline.terminal.TerminalBuilder;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.InfoCmp.Capability;
import java.awt.Desktop;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Locale;
import java.util.Properties;
import static java.lang.String.format;
import static org.jline.utils.AttributedStyle.BOLD;
import static org.jline.utils.AttributedStyle.BRIGHT;
import static org.jline.utils.AttributedStyle.DEFAULT;
import static org.jline.utils.AttributedStyle.RED;
import static org.jline.utils.AttributedStyle.YELLOW;
public class Cli {
@ -37,7 +36,7 @@ public class Cli {
private final BindingReader bindingReader;
private final Keys keys;
private final CliConfiguration cfg;
private final HttpCliClient cliClient;
private final CliHttpClient cliClient;
public static void main(String... args) throws Exception {
try (Terminal term = TerminalBuilder.builder().build()) {
@ -51,8 +50,8 @@ public class Cli {
bindingReader = new BindingReader(term.reader());
keys = new Keys(term);
cfg = new CliConfiguration("localhost:9200", new Properties());
cliClient = new HttpCliClient(cfg);
cfg = new CliConfiguration("localhost:9200/_cli", new Properties());
cliClient = new CliHttpClient(cfg);
}
@ -64,13 +63,16 @@ public class Cli {
.completer(Completers.INSTANCE)
.build();
String DEFAULT_PROMPT = "sql> ";
String MULTI_LINE_PROMPT = " | ";
String prompt = null;
String DEFAULT_PROMPT = new AttributedString("sql> ", DEFAULT.foreground(YELLOW)).toAnsi(term);
String MULTI_LINE_PROMPT = new AttributedString(" | ", DEFAULT.foreground(YELLOW)).toAnsi(term);
StringBuilder multiLine = new StringBuilder();
String prompt = DEFAULT_PROMPT;
prompt = DEFAULT_PROMPT;
out.flush();
printLogo(out);
while (true) {
String line = null;
@ -103,26 +105,61 @@ public class Cli {
line = multiLine.toString().trim();
multiLine.setLength(0);
}
//
// local commands
//
// special case to handle exit
if (isExit(line)) {
out.println("Bye!");
out.println(new AttributedString("Bye!", DEFAULT.foreground(BRIGHT)).toAnsi(term));
out.flush();
return;
}
if (isClear(line)) {
term.puts(Capability.clear_screen);
}
else if (isServerInfo(line)) {
executeServerInfo(out);
else if (isLogo(line)) {
printLogo(out);
}
else {
executeCommand(line, out);
try {
if (isServerInfo(line)) {
executeServerInfo(out);
}
else {
executeCommand(line, out);
}
} catch (RuntimeException ex) {
AttributedStringBuilder asb = new AttributedStringBuilder();
asb.append("Communication error [", BOLD.foreground(RED));
asb.append(ex.getMessage(), DEFAULT.boldOff().italic().foreground(YELLOW));
asb.append("]", BOLD.underlineOff().foreground(RED));
out.println(asb.toAnsi(term));
}
out.println();
}
out.flush();
}
}
private static String logo() {
try (InputStream io = Cli.class.getResourceAsStream("logo.txt")) {
if (io != null) {
return IOUtils.asBytes(io).toString();
}
} catch (IOException io) {
}
return "Could not load logo...";
}
private void printLogo(PrintWriter out) {
term.puts(Capability.clear_screen);
out.println(logo());
out.println();
}
private static boolean isClear(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("cls"));
@ -130,12 +167,16 @@ public class Cli {
private boolean isServerInfo(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("connect"));
return (line.equals("info"));
}
private boolean isLogo(String line) {
line = line.toLowerCase(Locale.ROOT);
return (line.equals("logo"));
}
private void executeServerInfo(PrintWriter out) {
InfoResponse info = cliClient.serverInfo();
out.println(format(Locale.ROOT, "Node:%s, Cluster:%s, Version:%s", info.node, info.cluster, info.versionString));
out.println(ResponseToString.toAnsi(cliClient.serverInfo()).toAnsi(term));
}
private static boolean isExit(String line) {
@ -144,29 +185,6 @@ public class Cli {
}
protected void executeCommand(String line, PrintWriter out) throws IOException {
// remove trailing
CommandResponse cmd = cliClient.command(line, null);
String result = StringUtils.EMPTY;
if (cmd.data != null) {
result = cmd.data.toString();
// TODO: handle graphviz
}
out.println(result);
}
private String displayGraphviz(String str) throws IOException {
// save the content to a temp file
Path dotTempFile = Files.createTempFile(Paths.get(System.getProperty("java.io.tmpdir")), "sql-gv", ".dot2img");
Files.write(dotTempFile, str.getBytes(StandardCharsets.UTF_8));
// run graphviz on it (dot needs to be on the file path)
open(dotTempFile);
return "";
}
@SuppressForbidden(reason="The desktop API needs File instead of Path")
private void open(Path path) throws IOException {
Desktop desktop = Desktop.getDesktop();
desktop.open(path.toFile());
out.print(ResponseToString.toAnsi(cliClient.command(line, null)).toAnsi(term));
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli;
import java.awt.Desktop;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.jline.utils.AttributedStringBuilder;
import static org.jline.utils.AttributedStyle.BOLD;
import static org.jline.utils.AttributedStyle.BRIGHT;
import static org.jline.utils.AttributedStyle.CYAN;
import static org.jline.utils.AttributedStyle.DEFAULT;
import static org.jline.utils.AttributedStyle.RED;
import static org.jline.utils.AttributedStyle.WHITE;
abstract class ResponseToString {
static AttributedStringBuilder toAnsi(Response response) {
AttributedStringBuilder sb = new AttributedStringBuilder();
if (response instanceof CommandResponse) {
CommandResponse cmd = (CommandResponse) response;
if (cmd.data != null) {
String data = cmd.data.toString();
if (data.startsWith("digraph ")) {
displayGraphviz(data);
}
else {
sb.append(data, DEFAULT.foreground(WHITE));
}
}
}
else if (response instanceof ExceptionResponse) {
ExceptionResponse ex = (ExceptionResponse) response;
sb.append(ex.message, BOLD.foreground(CYAN));
}
else if (response instanceof InfoResponse) {
InfoResponse info = (InfoResponse) response;
sb.append("Node:", DEFAULT.foreground(BRIGHT));
sb.append(info.node, DEFAULT.foreground(WHITE));
sb.append(" Cluster:", DEFAULT.foreground(BRIGHT));
sb.append(info.cluster, DEFAULT.foreground(WHITE));
sb.append(" Version:", DEFAULT.foreground(BRIGHT));
sb.append(info.versionString, DEFAULT.foreground(WHITE));
}
else if (response instanceof ErrorResponse) {
ErrorResponse error = (ErrorResponse) response;
sb.append("Server error:", BOLD.foreground(RED));
sb.append(error.message, DEFAULT.italic().foreground(RED));
}
else {
sb.append("Invalid response received from server...", BOLD.foreground(RED));
}
return sb;
}
private static void displayGraphviz(String str) {
try {
// save the content to a temp file
Path dotTempFile = Files.createTempFile("sql-gv", ".dot2img");
Files.write(dotTempFile, str.getBytes(StandardCharsets.UTF_8));
// run graphviz on it (dot needs to be on the file path)
Desktop desktop = Desktop.getDesktop();
File f = dotTempFile.toFile();
desktop.open(f);
f.deleteOnExit();
} catch (IOException ex) {
// nope
}
}
}

View File

@ -5,50 +5,47 @@
*/
package org.elasticsearch.xpack.sql.cli.net.client;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.CliException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.CliException;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ProtoUtils;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
public class HttpCliClient implements AutoCloseable {
@FunctionalInterface
interface DataInputFunction<R> {
R apply(DataInput in) throws IOException;
}
public class CliHttpClient implements AutoCloseable {
private final HttpClient http;
private final CliConfiguration cfg;
public HttpCliClient(CliConfiguration cfg) {
public CliHttpClient(CliConfiguration cfg) {
http = new HttpClient(cfg);
this.cfg = cfg;
}
public InfoResponse serverInfo() {
Bytes ba = http.put(out -> ProtoUtils.write(out, new InfoRequest()));
public Response serverInfo() {
Bytes ba = http.put(out -> ProtoUtils.write(out, new InfoRequest(System.getProperties())));
return doIO(ba, in -> readResponse(in, Action.INFO));
}
public CommandResponse command(String command, String requestId) {
public Response command(String command, String requestId) {
Bytes ba = http.put(out -> ProtoUtils.write(out, new CommandRequest(command)));
return doIO(ba, in -> {
CommandResponse response = readResponse(in, Action.COMMAND);
Response response = readResponse(in, Action.COMMAND);
// read data
String result = in.readUTF();
return new CommandResponse(response.serverTimeQueryReceived, response.serverTimeResponseSent, response.requestId, result);
if (response instanceof CommandResponse) {
String result = in.readUTF();
CommandResponse cr = (CommandResponse) response;
return new CommandResponse(cr.serverTimeQueryReceived, cr.serverTimeResponseSent, cr.requestId, result);
}
return response;
});
}
@ -74,22 +71,15 @@ public class HttpCliClient implements AutoCloseable {
throw new CliException("Expected response for %s, found %s", expected, action);
}
Response response = ProtoUtils.readResponse(in, header);
if (response instanceof ExceptionResponse) {
ExceptionResponse ex = (ExceptionResponse) response;
throw new CliException(ex.message);
}
if (response instanceof ErrorResponse) {
ErrorResponse error = (ErrorResponse) response;
throw new CliException("%s", error.stack);
}
if (response instanceof Response) {
return (R) response;
}
throw new CliException("Invalid response status %08X", header);
return (R) ProtoUtils.readResponse(in, header);
}
public void close() {}
}
@FunctionalInterface
private interface DataInputFunction<R> {
R apply(DataInput in) throws IOException;
}
}

View File

@ -5,21 +5,20 @@
*/
package org.elasticsearch.xpack.sql.cli.net.client;
import java.net.MalformedURLException;
import java.net.URL;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.CliException;
import org.elasticsearch.xpack.sql.net.client.ClientException;
import org.elasticsearch.xpack.sql.net.client.DataOutputConsumer;
import org.elasticsearch.xpack.sql.net.client.jre.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
public class HttpClient {
import java.net.MalformedURLException;
import java.net.URL;
class HttpClient {
private final CliConfiguration cfg;
public HttpClient(CliConfiguration cfg) {
HttpClient(CliConfiguration cfg) {
this.cfg = cfg;
}
@ -32,25 +31,17 @@ public class HttpClient {
}
boolean head(String path) {
try {
return JreHttpUrlConnection.http(url(path), cfg, JreHttpUrlConnection::head);
} catch (ClientException ex) {
throw new ClientException(ex, "Transport failure");
}
return JreHttpUrlConnection.http(url(path), cfg, JreHttpUrlConnection::head);
}
Bytes put(DataOutputConsumer os) {
return put("sql/", os);
return put("", os);
}
Bytes put(String path, DataOutputConsumer os) {
try {
return JreHttpUrlConnection.http(url(path), cfg, con -> {
return con.put(os);
});
} catch (ClientException ex) {
throw new CliException(ex, "Transport failure");
}
return JreHttpUrlConnection.http(url(path), cfg, con -> {
return con.put(os);
});
}
void close() {}

View File

@ -0,0 +1,25 @@
.sssssss.` .sssssss.
.:sXXXXXXXXXXo` `ohXXXXXXXXXho.
.yXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXXX-
.XXXXXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXXXXXX.
.XXXXXXXXXXXXXXXXXXXXo. .oXXXXXXXXXXXXXXXXXXXXh
.XXXXXXXXXXXXXXXXXXXXXXo``oXXXXXXXXXXXXXXXXXXXXXXy
`yXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.
`oXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXXXXXo`
.XXXXXXXXXXXXXXXXXXXXXXXXXo`
.oXXXXXXXXXXXXXXXXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXo` `odo`
`oXXXXXXXXXXXXXXXXXXXXXXXXo` `oXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXo` `oXXXXXXXXXo`
`oXXXXXXXXXXXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXo`
`yXXXXXXXXXXXXXXXXXXXXXXXo` oXXXXXXXXXXXXXXXXX.
.XXXXXXXXXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXXXXXXXy
.XXXXXXXXXXXXXXXXXXXXo` /XXXXXXXXXXXXXXXXXXXXX
.XXXXXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXXXXXX-
-XXXXXXXXXXXXXXXo` `oXXXXXXXXXXXXXXXo`
.oXXXXXXXXXXXo` `oXXXXXXXXXXXo.
`.sshXXyso` SQL `.sshXhss.`

View File

@ -11,12 +11,16 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.xpack.sql.cli.CliConfiguration;
import org.elasticsearch.xpack.sql.cli.integration.server.CliHttpServer;
import org.elasticsearch.xpack.sql.cli.net.client.HttpCliClient;
import org.elasticsearch.xpack.sql.cli.net.client.CliHttpClient;
import org.elasticsearch.xpack.sql.cli.net.protocol.CommandResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.Matchers.is;
@ -30,7 +34,7 @@ public class ProtoTests extends ESTestCase {
private static Client esClient;
private static CliHttpServer server;
private static HttpCliClient client;
private static CliHttpClient client;
@BeforeClass
public static void setupServer() throws Exception {
@ -44,7 +48,7 @@ public class ProtoTests extends ESTestCase {
if (client == null) {
CliConfiguration ci = new CliConfiguration(server.url(), new Properties());
client = new HttpCliClient(ci);
client = new CliHttpClient(ci);
}
}
@ -67,7 +71,7 @@ public class ProtoTests extends ESTestCase {
}
public void testInfoAction() throws Exception {
InfoResponse esInfo = client.serverInfo();
InfoResponse esInfo = (InfoResponse) client.serverInfo();
assertThat(esInfo, notNullValue());
assertThat(esInfo.cluster, is("elasticsearch"));
assertThat(esInfo.node, not(isEmptyOrNullString()));
@ -78,8 +82,64 @@ public class ProtoTests extends ESTestCase {
}
public void testBasicQuery() throws Exception {
CommandResponse command = client.command("SHOW TABLES", null);
// System.out.println(command.data);
// NOCOMMIT test this
CommandResponse command = (CommandResponse) client.command("SHOW TABLES", null);
System.out.println(command.data);
}
public void testDemoQuery() throws Exception {
CommandResponse command = (CommandResponse) client.command("SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM emp.emp WHERE dep.dept_name = 'Production' AND tenure > 30 ORDER BY start", null);
System.out.println(command.data);
}
public void testDemo() throws Exception {
// add math functions
// add statistical function + explanation on optimization
List<String> commands = Arrays.asList(
"SHOW TABLES",
"DESCRIBE emp.emp",
"SELECT * FROM emp.emp",
"SELECT * FROM emp.emp LIMIT 5",
"SELECT last_name l, first_name f FROM emp.emp WHERE gender = 'F' LIMIT 5",
"SELECT last_name l, first_name f FROM emp.emp WHERE tenure > 30 ORDER BY salary LIMIT 5",
"SELECT * FROM emp.emp WHERE MATCH(first_name, 'Mary')",
"SELECT * FROM emp.emp WHERE MATCH('first_name,last_name', 'Morton', 'type=best_fields;default_operator=OR')",
"SELECT * FROM emp.emp WHERE QUERY('Elvis Alain')",
"SHOW FUNCTIONS",
"SHOW FUNCTIONS LIKE 'L%'",
"SELECT LOG(salary) FROM emp.emp",
"SELECT salary s, LOG(salary) m FROM emp.emp LIMIT 5",
"SELECT salary s, EXP(LOG(salary)) m FROM emp.emp LIMIT 5",
"SELECT salary s, ROUND(EXP(LOG(salary))) m FROM emp.emp LIMIT 5",
"SELECT salary s, ROUND(EXP(LOG(salary))) m FROM emp.emp ORDER BY ROUND(LOG(emp_no)) LIMIT 5",
"SHOW FUNCTIONS LIKE '%DAY%'",
"SELECT year(birth_date) year, last_name l, first_name f FROM emp.emp WHERE year(birth_date) <=1960 AND tenure < 25 ORDER BY year LIMIT 5",
"SELECT COUNT(*) FROM emp.emp",
"SELECT COUNT(*) FROM emp.emp WHERE emp_no >= 10010",
"EXPLAIN (PLAN EXECUTABLE) SELECT COUNT(*) FROM emp.emp",
"SELECT tenure, COUNT(*) count, MIN(salary) min, AVG(salary) avg, MAX(salary) max FROM emp.emp GROUP BY tenure",
"SELECT YEAR(birth_date) born, COUNT(*) count, MIN(salary) min, AVG(salary) avg, MAX(salary) max FROM emp.emp GROUP BY born",
"SELECT tenure, gender, COUNT(tenure) count, AVG(salary) avg FROM emp.emp GROUP BY tenure, gender HAVING avg > 50000",
"SELECT gender, tenure, AVG(salary) avg FROM emp.emp GROUP BY gender, tenure HAVING avg > 50000 ORDER BY tenure DESC",
// nested docs
"DESCRIBE emp.emp",
"SELECT dep FROM emp.emp",
"SELECT dep.dept_name, first_name, last_name FROM emp.emp WHERE emp_no = 10020",
"SELECT first_name f, last_name l, dep.from_date FROM emp.emp WHERE dep.dept_name = 'Production' ORDER BY dep.from_date",
"SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM emp.emp WHERE dep.dept_name = 'Production' AND tenure > 30 ORDER BY start"
);
for (String c : commands) {
Response command = (Response) client.command(c, null);
String msg = "";
if (command instanceof ExceptionResponse) {
msg = ((ExceptionResponse) command).message;
}
if (command instanceof CommandResponse) {
msg = ((CommandResponse) command).data.toString();
}
System.out.println(msg);
}
}
}

View File

@ -34,6 +34,6 @@ class CliProtoHandler extends ProtoHandler<Response> {
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
Request req = ProtoUtils.readRequest(in);
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> error(http, ex)));
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex)));
}
}

View File

@ -4,11 +4,9 @@ description = 'Request and response objects shared by the jdbc driver and ' +
'its backend in :x-pack-elasticsearch:plugin'
dependencies {
compile project(":x-pack-elasticsearch:sql-clients:net-client")
testCompile project(':x-pack-elasticsearch:sql-clients:test-utils')
}
dependencyLicenses {
mapping from: /net-client.*/, to: 'elasticsearch'
ignoreSha 'net-client'
}

View File

@ -10,18 +10,48 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.StringUtils.EMPTY;
public class InfoRequest extends Request {
public InfoRequest() {
public final String jvmVersion, jvmVendor, jvmClassPath, osName, osVersion;
public InfoRequest(Properties props) {
super(Action.INFO);
jvmVersion = props.getProperty("java.version", EMPTY);
jvmVendor = props.getProperty("java.vendor", EMPTY);
jvmClassPath = props.getProperty("java.class.path", EMPTY);
osName = props.getProperty("os.name", EMPTY);
osVersion = props.getProperty("os.version", EMPTY);
}
public InfoRequest(String jvmVersion, String jvmVendor, String jvmClassPath, String osName, String osVersion) {
super(Action.INFO);
this.jvmVersion = jvmVersion;
this.jvmVendor = jvmVendor;
this.jvmClassPath = jvmClassPath;
this.osName = osName;
this.osVersion = osVersion;
}
@Override
public void encode(DataOutput out) throws IOException {
out.writeInt(action.value());
out.writeUTF(jvmVersion);
out.writeUTF(jvmVendor);
out.writeUTF(jvmClassPath);
out.writeUTF(osName);
out.writeUTF(osVersion);
}
public static InfoRequest decode(DataInput in) throws IOException {
return new InfoRequest();
String jvmVersion = in.readUTF();
String jvmVendor = in.readUTF();
String jvmClassPath = in.readUTF();
String osName = in.readUTF();
String osVersion = in.readUTF();
return new InfoRequest(jvmVersion, jvmVendor, jvmClassPath, osName, osVersion);
}
}

View File

@ -5,17 +5,14 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Locale;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import static java.lang.String.format;
import static org.elasticsearch.xpack.sql.net.client.util.StringUtils.nullAsEmpty;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.StringUtils.nullAsEmpty;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.StringUtils.splitToIndexAndType;
public class MetaColumnRequest extends Request {
@ -28,7 +25,7 @@ public class MetaColumnRequest extends Request {
this.tablePattern = nullAsEmpty(tablePattern);
this.columnPattern = nullAsEmpty(columnPattern);
String[] split = StringUtils.splitToIndexAndType(tablePattern);
String[] split = splitToIndexAndType(tablePattern);
this.index = split[0];
this.type = split[1];

View File

@ -5,15 +5,15 @@
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Locale;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import static java.lang.String.format;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.StringUtils.splitToIndexAndType;
public class MetaTableRequest extends Request {
@ -25,7 +25,7 @@ public class MetaTableRequest extends Request {
super(Action.META_TABLE);
this.pattern = pattern;
String[] split = StringUtils.splitToIndexAndType(pattern);
String[] split = splitToIndexAndType(pattern);
this.index = split[0];
this.type = split[1];

View File

@ -11,13 +11,14 @@ import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLTimeoutException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import javax.sql.rowset.serial.SerialException;
import org.elasticsearch.xpack.sql.net.client.util.ObjectUtils;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
//
// Basic tabular messaging for the JDBC driver
@ -28,7 +29,6 @@ import static java.util.Collections.emptyMap;
// The proto is based around a simple, single request-response model.
// Note the field order is _important_.
// To simplify things, the protocol is not meant to be backwards compatible.
// NOCOMMIT the protocol kind of should be backwards compatible though....
//
public interface Proto {
@ -68,7 +68,8 @@ public interface Proto {
// stacktrace - string - exception stacktrace (should be massaged)
ERROR (0xF000000);
private static final Map<Integer, Status> MAP = ObjectUtils.mapEnum(Status.class, Status::value);
private static final Map<Integer, Status> MAP = Arrays.stream(Status.class.getEnumConstants())
.collect(toMap(Status::value, Function.identity()));
private final int value;
@ -109,7 +110,8 @@ public interface Proto {
TIMEOUT (0x400);
private static final Map<Integer, SqlExceptionType> MAP = ObjectUtils.mapEnum(SqlExceptionType.class, SqlExceptionType::value);
private static final Map<Integer, SqlExceptionType> MAP = Arrays.stream(SqlExceptionType.class.getEnumConstants())
.collect(toMap(SqlExceptionType::value, Function.identity()));
private final int value;
@ -276,7 +278,8 @@ public interface Proto {
QUERY_CLOSE(0x19);
private static final Map<Integer, Action> MAP = ObjectUtils.mapEnum(Action.class, Action::value);
private static final Map<Integer, Action> MAP = Arrays.stream(Action.class.getEnumConstants())
.collect(toMap(Action::value, Function.identity()));
private final int value;

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
abstract class StringUtils {
static final String EMPTY = "";
static String nullAsEmpty(String string) {
return string == null ? EMPTY : string;
}
static boolean hasText(CharSequence sequence) {
if (!hasLength(sequence)) {
return false;
}
int length = sequence.length();
for (int i = 0; i < length; i++) {
if (!Character.isWhitespace(sequence.charAt(i))) {
return true;
}
}
return false;
}
static boolean hasLength(CharSequence sequence) {
return (sequence != null && sequence.length() > 0);
}
static String[] splitToIndexAndType(String pattern) {
List<String> tokens = tokenize(pattern, ".");
String[] results = new String[2];
if (tokens.size() == 2) {
results[0] = tokens.get(0);
results[1] = tokens.get(1);
}
else {
results[0] = nullAsEmpty(pattern);
results[1] = EMPTY;
}
return results;
}
static List<String> tokenize(String string, String delimiters) {
return tokenize(string, delimiters, true, true);
}
static List<String> tokenize(String string, String delimiters, boolean trimTokens, boolean ignoreEmptyTokens) {
if (!hasText(string)) {
return Collections.emptyList();
}
StringTokenizer st = new StringTokenizer(string, delimiters);
List<String> tokens = new ArrayList<String>();
while (st.hasMoreTokens()) {
String token = st.nextToken();
if (trimTokens) {
token = token.trim();
}
if (!ignoreEmptyTokens || token.length() > 0) {
tokens.add(token);
}
}
return tokens;
}
}

View File

@ -28,12 +28,12 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.xpack.sql.jdbc.debug.Debug;
import org.elasticsearch.xpack.sql.jdbc.net.client.HttpJdbcClient;
import org.elasticsearch.xpack.sql.jdbc.net.client.JdbcHttpClient;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
public class JdbcConnection implements Connection, JdbcWrapper {
final HttpJdbcClient client;
final JdbcHttpClient client;
private boolean closed = false;
private String catalog;
@ -46,7 +46,7 @@ public class JdbcConnection implements Connection, JdbcWrapper {
public JdbcConnection(JdbcConfiguration connectionInfo) {
info = connectionInfo;
client = new HttpJdbcClient(connectionInfo);
client = new JdbcHttpClient(connectionInfo);
url = connectionInfo.asUrl().toExternalForm();
userName = connectionInfo.userName();

View File

@ -13,14 +13,14 @@ import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
class DefaultCursor implements Cursor {
private final HttpJdbcClient client;
private final JdbcHttpClient client;
private final RequestMeta meta;
private final Page page;
private int row = -1;
private String requestId;
DefaultCursor(HttpJdbcClient client, String scrollId, Page page, RequestMeta meta) {
DefaultCursor(JdbcHttpClient client, String scrollId, Page page, RequestMeta meta) {
this.client = client;
this.meta = meta;
this.requestId = simplifyScrollId(scrollId);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Response;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.TimeoutInfo;
import org.elasticsearch.xpack.sql.jdbc.util.BytesArray;
import org.elasticsearch.xpack.sql.jdbc.util.FastByteArrayInputStream;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import java.io.Closeable;
import java.io.DataInput;
@ -37,7 +38,7 @@ import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
public class HttpJdbcClient implements Closeable {
public class JdbcHttpClient implements Closeable {
@FunctionalInterface
interface DataInputFunction<R> {
R apply(DataInput in) throws IOException, SQLException;
@ -47,7 +48,7 @@ public class HttpJdbcClient implements Closeable {
private final JdbcConfiguration conCfg;
private InfoResponse serverInfo;
public HttpJdbcClient(JdbcConfiguration conCfg) {
public JdbcHttpClient(JdbcConfiguration conCfg) {
http = new HttpClient(conCfg);
this.conCfg = conCfg;
}
@ -57,7 +58,7 @@ public class HttpJdbcClient implements Closeable {
// NOCOMMIT this seems race condition-y
http.setNetworkTimeout(timeoutInMs);
try {
return http.head("");
return http.head(StringUtils.EMPTY);
} finally {
http.setNetworkTimeout(oldTimeout);
}
@ -135,7 +136,7 @@ public class HttpJdbcClient implements Closeable {
}
private InfoResponse fetchServerInfo() throws SQLException {
BytesArray ba = http.put(out -> ProtoUtils.write(out, new InfoRequest()));
BytesArray ba = http.put(out -> ProtoUtils.write(out, new InfoRequest(System.getProperties())));
return doIO(ba, in -> readResponse(in, Action.INFO));
}

View File

@ -34,6 +34,6 @@ class SqlProtoHandler extends ProtoHandler<Response> {
@Override
protected void handle(HttpExchange http, DataInput in) throws IOException {
Request req = ProtoUtils.readRequest(in);
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> error(http, ex)));
server.handle(req, wrap(resp -> sendHttpResponse(http, resp), ex -> fail(http, ex)));
}
}

View File

@ -5,6 +5,12 @@
*/
package org.elasticsearch.xpack.sql.net.client.jre;
import org.elasticsearch.xpack.sql.net.client.ClientException;
import org.elasticsearch.xpack.sql.net.client.ConnectionConfiguration;
import org.elasticsearch.xpack.sql.net.client.DataOutputConsumer;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
import org.elasticsearch.xpack.sql.net.client.util.IOUtils;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
@ -12,27 +18,22 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.elasticsearch.xpack.sql.net.client.ClientException;
import org.elasticsearch.xpack.sql.net.client.ConnectionConfiguration;
import org.elasticsearch.xpack.sql.net.client.DataOutputConsumer;
import org.elasticsearch.xpack.sql.net.client.util.Bytes;
import org.elasticsearch.xpack.sql.net.client.util.IOUtils;
import java.util.zip.GZIPInputStream;
public class JreHttpUrlConnection implements Closeable {
private boolean closed = false;
final HttpURLConnection con;
private final URL url;
private static final String GZIP = "gzip";
public JreHttpUrlConnection(URL url, ConnectionConfiguration cfg) throws ClientException {
this.url = url;
try {
con = (HttpURLConnection) url.openConnection();
} catch (IOException ex) {
throw new ClientException(ex, "Cannot setup connection to %s", url);
throw new ClientException(ex, "Cannot setup connection to %s (%s)", url, ex.getMessage());
}
con.setConnectTimeout((int) cfg.getConnectTimeout());
@ -43,8 +44,7 @@ public class JreHttpUrlConnection implements Closeable {
// HttpURL adds this header by default, HttpS does not
// adding it here to be consistent
con.setRequestProperty("Accept-Charset", "UTF-8");
// NOCOMMIT if we're going to accept gzip then we need to transparently unzip it on the way out...
// con.setRequestProperty("Accept-Encoding", "gzip");
//con.setRequestProperty("Accept-Encoding", GZIP);
}
public boolean head() throws ClientException {
@ -53,7 +53,7 @@ public class JreHttpUrlConnection implements Closeable {
int responseCode = con.getResponseCode();
return responseCode == HttpURLConnection.HTTP_OK;
} catch (IOException ex) {
throw new ClientException(ex, "Cannot HEAD address %s", url);
throw new ClientException(ex, "Cannot HEAD address %s (%s)", url, ex.getMessage());
}
}
@ -65,22 +65,24 @@ public class JreHttpUrlConnection implements Closeable {
try (OutputStream out = con.getOutputStream()) {
doc.accept(new DataOutputStream(out));
}
if (con.getResponseCode() >= 400) {
InputStream err = con.getErrorStream();
String response;
if (err == null) {
response = "server did not return a response";
} else {
response = new String(IOUtils.asBytes(err).bytes(), StandardCharsets.UTF_8);
}
throw new ClientException("Protocol/client error; server returned [" + con.getResponseMessage() + "]: " + response);
if (con.getResponseCode() >= 500) {
throw new ClientException("Server error: %s(%d;%s)", con.getResponseMessage(), con.getResponseCode(), IOUtils.asBytes(getStream(con, con.getErrorStream())).toString());
}
// NOCOMMIT seems weird that we buffer this into a byte stream and then wrap it in a byte array input stream.....
return IOUtils.asBytes(con.getInputStream());
if (con.getResponseCode() >= 400) {
throw new ClientException("Client error: %s(%d;%s)", con.getResponseMessage(), con.getResponseCode(), IOUtils.asBytes(getStream(con, con.getErrorStream())).toString());
}
return IOUtils.asBytes(getStream(con, con.getInputStream()));
} catch (IOException ex) {
throw new ClientException(ex, "Cannot POST address %s", url);
throw new ClientException(ex, "Cannot POST address %s (%s)", url, ex.getMessage());
}
}
private static InputStream getStream(HttpURLConnection con, InputStream stream) throws IOException {
if (GZIP.equals(con.getContentEncoding())) {
return new GZIPInputStream(stream);
}
return stream;
}
public void connect() {
if (closed) {
@ -89,7 +91,7 @@ public class JreHttpUrlConnection implements Closeable {
try {
con.connect();
} catch (IOException ex) {
throw new ClientException(ex, "Cannot open connection to %s", url);
throw new ClientException(ex, "Cannot open connection to %s (%s)", url, ex.getMessage());
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.net.client.util;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class Bytes {
@ -28,4 +29,8 @@ public class Bytes {
public byte[] copy() {
return Arrays.copyOf(buf, size);
}
public String toString() {
return new String(buf, 0, size, StandardCharsets.UTF_8);
}
}

View File

@ -23,7 +23,7 @@ import java.io.IOException;
public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {
protected static final Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName());
protected final static Logger log = ESLoggerFactory.getLogger(ProtoHandler.class.getName());
private final TimeValue TV = TimeValue.timeValueSeconds(5);
protected final NodeInfo info;
protected final String clusterName;
@ -44,46 +44,46 @@ public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {
log.debug("Received query call...");
try (DataInputStream in = new DataInputStream(http.getRequestBody())) {
String msg = headerReader.apply(in);
if (msg != null) {
http.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
http.close();
return;
}
handle(http, in);
} catch (Exception ex) {
error(http, ex);
fail(http, ex);
}
}
protected abstract void handle(HttpExchange http, DataInput in) throws IOException;
protected void sendHttpResponse(HttpExchange http, R response) throws IOException {
http.sendResponseHeaders(RestStatus.OK.getStatus(), 0);
// first do the conversion in case an exception is triggered
BytesReference data = toProto.apply(response);
if (http.getResponseHeaders().isEmpty()) {
http.sendResponseHeaders(RestStatus.OK.getStatus(), 0);
}
data.writeTo(http.getResponseBody());
http.close();
}
protected void error(HttpExchange http, Exception ex) {
log.error("Caught error", ex);
protected void fail(HttpExchange http, Exception ex) {
log.error("Caught error while transmitting response", ex);
try {
// the error conversion has failed, halt
if (http.getResponseHeaders().isEmpty()) {
http.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} catch (IOException ex2) {
// ignore
log.error("Caught error while trying to send error", ex2);
} catch (IOException ioEx) {
log.error("Caught error while trying to catch error", ex);
} finally {
http.close();
}
}
@Override
public void close() {
public void close() throws Exception {
// no-op
}
}