Replaces custom URL parsing with URI parsing and moves baseURI from each client call into connection configuration.

relates elastic/x-pack-elasticsearch#2882

Original commit: elastic/x-pack-elasticsearch@c51059f56f
This commit is contained in:
Igor Motov 2017-11-27 18:09:05 -05:00 committed by GitHub
parent 378abf1d8f
commit 0228020c5c
13 changed files with 337 additions and 307 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.client.shared.ConnectionConfiguration;
import org.elasticsearch.xpack.sql.client.shared.SuppressForbidden;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.StringUtils;
@ -26,7 +27,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Locale;
@ -35,6 +35,8 @@ import java.util.logging.LogManager;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.parseURI;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.removeQuery;
import static org.jline.utils.AttributedStyle.BOLD;
import static org.jline.utils.AttributedStyle.BRIGHT;
import static org.jline.utils.AttributedStyle.DEFAULT;
@ -42,43 +44,39 @@ import static org.jline.utils.AttributedStyle.RED;
import static org.jline.utils.AttributedStyle.YELLOW;
public class Cli {
public static String DEFAULT_CONNECTION_STRING = "http://localhost:9200/";
public static URI DEFAULT_URI = URI.create(DEFAULT_CONNECTION_STRING);
public static void main(String... args) throws Exception {
/* Initialize the logger from the a properties file we bundle. This makes sure
* we get useful error messages from jLine. */
LogManager.getLogManager().readConfiguration(Cli.class.getResourceAsStream("/logging.properties"));
String hostAndPort = "localhost:9200";
final URI uri;
final String connectionString;
Properties properties = new Properties();
String user = null;
String password = null;
if (args.length > 0) {
hostAndPort = args[0];
if (false == hostAndPort.contains("://")) {
// Default to http
hostAndPort = "http://" + hostAndPort;
}
URI parsed;
connectionString = args[0];
try {
parsed = new URI(hostAndPort);
} catch (URISyntaxException e) {
exit("Invalid connection configuration [" + hostAndPort + "]: " + e.getMessage(), 1);
uri = removeQuery(parseURI(connectionString, DEFAULT_URI), connectionString, DEFAULT_URI);
} catch (IllegalArgumentException ex) {
exit(ex.getMessage(), 1);
return;
}
if (false == "".equals(parsed.getPath())) {
exit("Invalid connection configuration [" + hostAndPort + "]: Path not allowed", 1);
return;
}
user = parsed.getUserInfo();
user = uri.getUserInfo();
if (user != null) {
// TODO just use a URI the whole time
// Tracked by https://github.com/elastic/x-pack-elasticsearch/issues/2882
hostAndPort = parsed.getScheme() + "://" + parsed.getHost() + ":" + parsed.getPort();
int colonIndex = user.indexOf(':');
if (colonIndex >= 0) {
password = user.substring(colonIndex + 1);
user = user.substring(0, colonIndex);
}
}
} else {
uri = DEFAULT_URI;
connectionString = DEFAULT_CONNECTION_STRING;
}
try (Terminal term = TerminalBuilder.builder().build()) {
try {
if (user != null) {
@ -89,12 +87,12 @@ public class Cli {
password = new BufferedReader(term.reader()).readLine();
term.echo(true);
}
properties.setProperty(CliConfiguration.AUTH_USER, user);
properties.setProperty(CliConfiguration.AUTH_PASS, password);
properties.setProperty(ConnectionConfiguration.AUTH_USER, user);
properties.setProperty(ConnectionConfiguration.AUTH_PASS, password);
}
boolean debug = StringUtils.parseBoolean(System.getProperty("cli.debug", "false"));
Cli console = new Cli(debug, new CliConfiguration(hostAndPort + "/_sql/cli?error_trace", properties), term);
Cli console = new Cli(debug, new ConnectionConfiguration(uri, connectionString, properties), term);
console.run();
} catch (FatalException e) {
term.writer().println(e.getMessage());
@ -108,7 +106,7 @@ public class Cli {
private int fetchSize = AbstractQueryInitRequest.DEFAULT_FETCH_SIZE;
private String fetchSeparator = "";
Cli(boolean debug, CliConfiguration cfg, Terminal terminal) {
Cli(boolean debug, ConnectionConfiguration cfg, Terminal terminal) {
this.debug = debug;
term = terminal;
cliClient = new CliHttpClient(cfg);

View File

@ -1,86 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.client.shared.ConnectionConfiguration;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Properties;
//
// Supports the following syntax
//
// http(s)://[host|ip]
// http(s)//[host|ip]:port/(prefix)
//
public class CliConfiguration extends ConnectionConfiguration {
private HostAndPort hostAndPort;
private String originalUrl;
private String urlFile = "/";
public CliConfiguration(String u, Properties props) {
super(props);
originalUrl = u;
parseUrl(u);
}
private void parseUrl(String u) {
if (u.endsWith("/")) {
u = u.substring(0, u.length() - 1);
}
// remove space
u = u.trim();
String hostAndPort = u;
int index = u.indexOf("://");
if (index > 0) {
u = u.substring(index + 3);
}
index = u.indexOf("/");
//
// parse host
//
if (index >= 0) {
hostAndPort = u.substring(0, index);
if (index + 1 < u.length()) {
urlFile = u.substring(index);
}
}
// look for port
index = hostAndPort.lastIndexOf(":");
if (index > 0) {
if (index + 1 >= hostAndPort.length()) {
throw new IllegalArgumentException("Invalid port specified");
}
String host = hostAndPort.substring(0, index);
String port = hostAndPort.substring(index + 1);
this.hostAndPort = new HostAndPort(host, Integer.parseInt(port));
}
else {
this.hostAndPort = new HostAndPort(u);
}
}
public URL asUrl() {
// TODO: need to assemble all the various params here
try {
return new URL(isSSLEnabled() ? "https" : "http", hostAndPort.ip, port(), urlFile);
} catch (MalformedURLException ex) {
throw new IllegalArgumentException("Cannot connect to server " + originalUrl, ex);
}
}
private int port() {
return hostAndPort.port > 0 ? hostAndPort.port : 9200;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.client.shared.ConnectionConfiguration;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
@ -24,26 +25,26 @@ import java.time.Instant;
import java.util.TimeZone;
public class CliHttpClient {
private final CliConfiguration cfg;
private final ConnectionConfiguration cfg;
public CliHttpClient(CliConfiguration cfg) {
public CliHttpClient(ConnectionConfiguration cfg) {
this.cfg = cfg;
}
public InfoResponse serverInfo() throws SQLException {
InfoRequest request = new InfoRequest();
return (InfoResponse) sendRequest(request);
return (InfoResponse) post(request);
}
public QueryResponse queryInit(String query, int fetchSize) throws SQLException {
// TODO allow customizing the time zone - this is what session set/reset/get should be about
QueryInitRequest request = new QueryInitRequest(query, fetchSize, TimeZone.getTimeZone("UTC"), timeout());
return (QueryResponse) sendRequest(request);
return (QueryResponse) post(request);
}
public QueryResponse nextPage(byte[] cursor) throws SQLException {
QueryPageRequest request = new QueryPageRequest(cursor, timeout());
return (QueryResponse) sendRequest(request);
return (QueryResponse) post(request);
}
private TimeoutInfo timeout() {
@ -51,9 +52,9 @@ public class CliHttpClient {
return new TimeoutInfo(clientTime, cfg.queryTimeout(), cfg.pageTimeout());
}
private Response sendRequest(Request request) throws SQLException {
private Response post(Request request) throws SQLException {
return AccessController.doPrivileged((PrivilegedAction<ResponseOrException<Response>>) () ->
JreHttpUrlConnection.http(cfg.asUrl(), cfg, con ->
JreHttpUrlConnection.http("_sql/cli", "error_trace", cfg, con ->
con.post(
out -> Proto.INSTANCE.writeRequest(request, out),
in -> Proto.INSTANCE.readResponse(request, in)

View File

@ -10,8 +10,7 @@ import org.elasticsearch.xpack.sql.client.shared.StringUtils;
import org.elasticsearch.xpack.sql.jdbc.JdbcSQLException;
import org.elasticsearch.xpack.sql.jdbc.util.Version;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URI;
import java.sql.DriverPropertyInfo;
import java.util.ArrayList;
import java.util.Arrays;
@ -22,10 +21,12 @@ import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.parseURI;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.removeQuery;
//
// Supports the following syntax
//
// jdbc:es:
// jdbc:es://[host|ip]
// jdbc:es://[host|ip]:port/(prefix)
// jdbc:es://[host|ip]:port/(prefix)(?options=value&)
@ -35,7 +36,9 @@ import java.util.TimeZone;
//TODO: beef this up for Security/SSL
public class JdbcConfiguration extends ConnectionConfiguration {
static final String URL_PREFIX = "jdbc:es:";
static final String URL_PREFIX = "jdbc:es://";
public static URI DEFAULT_URI = URI.create("http://localhost:9200/");
static final String DEBUG = "debug";
static final String DEBUG_DEFAULT = "false";
@ -63,9 +66,7 @@ public class JdbcConfiguration extends ConnectionConfiguration {
}
// immutable properties
private final HostAndPort hostAndPort;
private final String originalUrl;
private final String urlFile;
private final boolean debug;
private final String debugOut;
@ -73,11 +74,9 @@ public class JdbcConfiguration extends ConnectionConfiguration {
private TimeZone timeZone;
public static JdbcConfiguration create(String u, Properties props) throws JdbcSQLException {
Object[] result = parseUrl(u);
String urlFile = (String) result[0];
HostAndPort hostAndPort = (HostAndPort) result[1];
Properties urlProps = (Properties) result[2];
URI uri = parseUrl(u);
Properties urlProps = parseProperties(uri, u);
uri = removeQuery(uri, u, DEFAULT_URI);
// override properties set in the URL with the ones specified programmatically
if (props != null) {
@ -85,7 +84,7 @@ public class JdbcConfiguration extends ConnectionConfiguration {
}
try {
return new JdbcConfiguration(u, urlFile, hostAndPort, urlProps);
return new JdbcConfiguration(uri, u, urlProps);
} catch (JdbcSQLException e) {
throw e;
} catch (Exception ex) {
@ -93,94 +92,34 @@ public class JdbcConfiguration extends ConnectionConfiguration {
}
}
private static Object[] parseUrl(String u) throws JdbcSQLException {
private static URI parseUrl(String u) throws JdbcSQLException {
String url = u;
String format = "jdbc:es://[host[:port]]*/[prefix]*[?[option=value]&]*";
if (!canAccept(u)) {
throw new JdbcSQLException("Expected [" + URL_PREFIX + "] url, received [" + u +"]");
throw new JdbcSQLException("Expected [" + URL_PREFIX + "] url, received [" + u + "]");
}
String urlFile = "/";
HostAndPort destination;
Properties props = new Properties();
try {
if (u.endsWith("/")) {
u = u.substring(0, u.length() - 1);
}
return parseURI(removeJdbcPrefix(u), DEFAULT_URI);
} catch (IllegalArgumentException ex) {
throw new JdbcSQLException(ex, "Invalid URL [" + url + "], format should be [" + format + "]");
}
}
// remove space
u = u.trim();
private static String removeJdbcPrefix(String connectionString) throws JdbcSQLException {
if (connectionString.startsWith(URL_PREFIX)) {
return "http://" + connectionString.substring(URL_PREFIX.length());
} else {
throw new JdbcSQLException("Expected [" + URL_PREFIX + "] url, received [" + connectionString + "]");
}
}
//
// remove prefix jdbc:es prefix
//
u = u.substring(URL_PREFIX.length(), u.length());
if (!u.startsWith("//")) {
throw new JdbcSQLException("Invalid URL [" + url + "], format should be [" + format + "]");
}
// remove //
u = u.substring(2);
String hostAndPort = u;
// / is required if any params are specified
// get it out of the way early on
int index = u.indexOf("/");
String params = null;
int pIndex = u.indexOf("?");
if (pIndex > 0) {
if (index < 0) {
throw new JdbcSQLException("Invalid URL [" + url + "], format should be [" + format + "]");
}
if (pIndex + 1 < u.length()) {
params = u.substring(pIndex + 1);
}
}
// parse url suffix (if any)
if (index >= 0) {
hostAndPort = u.substring(0, index);
if (index + 1 < u.length()) {
urlFile = u.substring(index);
index = urlFile.indexOf("?");
if (index > 0) {
urlFile = urlFile.substring(0, index);
if (!urlFile.endsWith("/")) {
urlFile = urlFile + "/";
}
}
}
}
//
// parse host
//
// look for port
index = hostAndPort.lastIndexOf(":");
if (index > 0) {
if (index + 1 >= hostAndPort.length()) {
throw new JdbcSQLException("Invalid port specified");
}
String host = hostAndPort.substring(0, index);
String port = hostAndPort.substring(index + 1);
destination = new HostAndPort(host, Integer.parseInt(port));
} else {
destination = new HostAndPort(hostAndPort);
}
//
// parse params
//
if (params != null) {
private static Properties parseProperties(URI uri, String u) throws JdbcSQLException {
Properties props = new Properties();
try {
if (uri.getRawQuery() != null) {
// parse properties
List<String> prms = StringUtils.tokenize(params, "&");
List<String> prms = StringUtils.tokenize(uri.getRawQuery(), "&");
for (String param : prms) {
List<String> args = StringUtils.tokenize(param, "=");
if (args.size() != 2) {
@ -196,18 +135,15 @@ public class JdbcConfiguration extends ConnectionConfiguration {
// Add the url to unexpected exceptions
throw new IllegalArgumentException("Failed to parse acceptable jdbc url [" + u + "]", e);
}
return new Object[] { urlFile, destination, props };
return props;
}
// constructor is private to force the use of a factory in order to catch and convert any validation exception
// and also do input processing as oppose to handling this from the constructor (which is tricky or impossible)
private JdbcConfiguration(String u, String urlFile, HostAndPort hostAndPort, Properties props) throws JdbcSQLException {
super(props);
private JdbcConfiguration(URI baseURI, String u, Properties props) throws JdbcSQLException {
super(baseURI, u, props);
this.originalUrl = u;
this.urlFile = urlFile;
this.hostAndPort = hostAndPort;
this.debug = parseValue(DEBUG, props.getProperty(DEBUG, DEBUG_DEFAULT), Boolean::parseBoolean);
this.debugOut = props.getProperty(DEBUG_OUTPUT, DEBUG_OUTPUT_DEFAULT);
@ -220,18 +156,6 @@ public class JdbcConfiguration extends ConnectionConfiguration {
return OPTION_NAMES;
}
public URL asUrl() throws JdbcSQLException {
try {
return new URL(isSSLEnabled() ? "https" : "http", hostAndPort.ip, port(), urlFile);
} catch (MalformedURLException ex) {
throw new JdbcSQLException(ex, "Cannot connect to server [" + originalUrl + "]");
}
}
private int port() {
return hostAndPort.port > 0 ? hostAndPort.port : 9200;
}
public boolean debug() {
return debug;
}

View File

@ -47,7 +47,7 @@ public class JdbcConnection implements Connection, JdbcWrapper {
cfg = connectionInfo;
client = new JdbcHttpClient(connectionInfo);
url = connectionInfo.asUrl().toExternalForm();
url = connectionInfo.connectionString();
userName = connectionInfo.authUser();
}

View File

@ -15,8 +15,6 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import org.elasticsearch.xpack.sql.protocol.shared.Response;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.SQLException;
@ -26,19 +24,9 @@ import java.sql.SQLException;
class HttpClient {
private final JdbcConfiguration cfg;
private final URL url;
HttpClient(JdbcConfiguration connectionInfo) throws SQLException {
this.cfg = connectionInfo;
URL baseUrl = connectionInfo.asUrl();
try {
// the baseUrl ends up / so the suffix can be directly appended
// TODO Do something with the error trace. Useful for filing bugs and debugging.
// Tracked by https://github.com/elastic/x-pack-elasticsearch/issues/3079
this.url = new URL(baseUrl, "_sql/jdbc?error_trace=true");
} catch (MalformedURLException ex) {
throw new JdbcException(ex, "Cannot connect to JDBC endpoint [" + baseUrl.toString() + "_sql/jdbc]");
}
}
void setNetworkTimeout(long millis) {
@ -51,21 +39,17 @@ class HttpClient {
boolean head() throws JdbcSQLException {
try {
URL root = new URL(url, "/");
return AccessController.doPrivileged((PrivilegedAction<Boolean>) () -> {
return JreHttpUrlConnection.http(root, cfg, JreHttpUrlConnection::head);
});
} catch (MalformedURLException ex) {
throw new JdbcSQLException(ex, "Cannot ping server");
return AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
JreHttpUrlConnection.http("", "error_trace", cfg, JreHttpUrlConnection::head));
} catch (ClientException ex) {
throw new JdbcSQLException(ex, "Transport failure");
throw new JdbcSQLException(ex, "Cannot ping server");
}
}
Response put(Request request) throws SQLException {
Response post(Request request) throws SQLException {
try {
return AccessController.doPrivileged((PrivilegedAction<ResponseOrException<Response>>) () ->
JreHttpUrlConnection.http(url, cfg, con ->
JreHttpUrlConnection.http("_sql/jdbc", "error_trace", cfg, con ->
con.post(
out -> Proto.INSTANCE.writeRequest(request, out),
in -> Proto.INSTANCE.readResponse(request, in)

View File

@ -56,7 +56,7 @@ public class JdbcHttpClient {
public Cursor query(String sql, RequestMeta meta) throws SQLException {
int fetch = meta.fetchSize() > 0 ? meta.fetchSize() : conCfg.pageSize();
QueryInitRequest request = new QueryInitRequest(sql, fetch, conCfg.timeZone(), timeout(meta));
QueryInitResponse response = (QueryInitResponse) http.put(request);
QueryInitResponse response = (QueryInitResponse) http.post(request);
return new DefaultCursor(this, response.cursor(), (Page) response.data, meta);
}
@ -66,7 +66,7 @@ public class JdbcHttpClient {
*/
public byte[] nextPage(byte[] cursor, Page page, RequestMeta meta) throws SQLException {
QueryPageRequest request = new QueryPageRequest(cursor, timeout(meta), page);
return ((QueryPageResponse) http.put(request)).cursor();
return ((QueryPageResponse) http.post(request)).cursor();
}
public InfoResponse serverInfo() throws SQLException {
@ -78,17 +78,17 @@ public class JdbcHttpClient {
private InfoResponse fetchServerInfo() throws SQLException {
InfoRequest request = new InfoRequest();
return (InfoResponse) http.put(request);
return (InfoResponse) http.post(request);
}
public List<String> metaInfoTables(String pattern) throws SQLException {
MetaTableRequest request = new MetaTableRequest(pattern);
return ((MetaTableResponse) http.put(request)).tables;
return ((MetaTableResponse) http.post(request)).tables;
}
public List<MetaColumnInfo> metaInfoColumns(String tablePattern, String columnPattern) throws SQLException {
MetaColumnRequest request = new MetaColumnRequest(tablePattern, columnPattern);
return ((MetaColumnResponse) http.put(request)).columns;
return ((MetaColumnResponse) http.post(request)).columns;
}
public void setNetworkTimeout(long millis) {

View File

@ -20,35 +20,39 @@ public class JdbcConfigurationTests extends ESTestCase {
public void testJustThePrefix() throws Exception {
Exception e = expectThrows(JdbcSQLException.class, () -> ci("jdbc:es:"));
assertEquals("Invalid URL [jdbc:es:], format should be [jdbc:es://[host[:port]]*/[prefix]*[?[option=value]&]*]", e.getMessage());
assertEquals("Expected [jdbc:es://] url, received [jdbc:es:]", e.getMessage());
}
public void testJustTheHost() throws Exception {
assertThat(ci("jdbc:es://localhost").asUrl().toString(), is("http://localhost:9200/"));
assertThat(ci("jdbc:es://localhost").baseUri().toString(), is("http://localhost:9200/"));
}
public void testHostAndPort() throws Exception {
assertThat(ci("jdbc:es://localhost:1234").asUrl().toString(), is("http://localhost:1234/"));
assertThat(ci("jdbc:es://localhost:1234").baseUri().toString(), is("http://localhost:1234/"));
}
public void testTrailingSlashForHost() throws Exception {
assertThat(ci("jdbc:es://localhost:1234/").asUrl().toString(), is("http://localhost:1234/"));
assertThat(ci("jdbc:es://localhost:1234/").baseUri().toString(), is("http://localhost:1234/"));
}
public void testMultiPathSuffix() throws Exception {
assertThat(ci("jdbc:es://a:1/foo/bar/tar/").asUrl().toString(), is("http://a:1/foo/bar/tar"));
assertThat(ci("jdbc:es://a:1/foo/bar/tar").baseUri().toString(), is("http://a:1/foo/bar/tar"));
}
public void testV6Localhost() throws Exception {
assertThat(ci("jdbc:es://[::1]:54161/foo/bar").baseUri().toString(), is("http://[::1]:54161/foo/bar"));
}
public void testDebug() throws Exception {
JdbcConfiguration ci = ci("jdbc:es://a:1/?debug=true");
assertThat(ci.asUrl().toString(), is("http://a:1/"));
assertThat(ci.baseUri().toString(), is("http://a:1/"));
assertThat(ci.debug(), is(true));
assertThat(ci.debugOut(), is("err"));
}
public void testDebugOut() throws Exception {
JdbcConfiguration ci = ci("jdbc:es://a:1/?debug=true&debug.output=jdbc.out");
assertThat(ci.asUrl().toString(), is("http://a:1/"));
assertThat(ci.baseUri().toString(), is("http://a:1/"));
assertThat(ci.debug(), is(true));
assertThat(ci.debugOut(), is("jdbc.out"));
}
@ -60,8 +64,19 @@ public class JdbcConfigurationTests extends ESTestCase {
public void testDebugOutWithSuffix() throws Exception {
JdbcConfiguration ci = ci("jdbc:es://a:1/foo/bar/tar?debug=true&debug.output=jdbc.out");
assertThat(ci.asUrl().toString(), is("http://a:1/foo/bar/tar/"));
assertThat(ci.baseUri().toString(), is("http://a:1/foo/bar/tar"));
assertThat(ci.debug(), is(true));
assertThat(ci.debugOut(), is("jdbc.out"));
}
public void testHttpWithSSLEnabled() throws Exception {
JdbcConfiguration ci = ci("jdbc:es://test?ssl=true");
assertThat(ci.baseUri().toString(), is("https://test:9200/"));
}
public void testHttpWithSSLDisabled() throws Exception {
JdbcConfiguration ci = ci("jdbc:es://test?ssl=false");
assertThat(ci.baseUri().toString(), is("http://test:9200/"));
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.sql.client.shared;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -27,25 +29,6 @@ import static java.util.Collections.emptyList;
*/
public class ConnectionConfiguration {
public static class HostAndPort {
public final String ip;
public final int port;
public HostAndPort(String ip) {
this(ip, 0);
}
public HostAndPort(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public String toString() {
return (port > 0 ? ip + ":" + port : ip);
}
}
// Timeouts
// 30s
@ -81,6 +64,9 @@ public class ConnectionConfiguration {
OPTION_NAMES.addAll(ProxyConfig.OPTION_NAMES);
}
// Base URI for all request
private final URI baseURI;
private final String connectionString;
// Proxy
private long connectTimeout;
@ -95,7 +81,8 @@ public class ConnectionConfiguration {
private final SslConfig sslConfig;
private final ProxyConfig proxyConfig;
public ConnectionConfiguration(Properties props) throws ClientException {
public ConnectionConfiguration(URI baseURI, String connectionString, Properties props) throws ClientException {
this.connectionString = connectionString;
Properties settings = props != null ? props : new Properties();
checkPropertyNames(settings, optionNames());
@ -113,6 +100,28 @@ public class ConnectionConfiguration {
sslConfig = new SslConfig(settings);
proxyConfig = new ProxyConfig(settings);
this.baseURI = normalizeSchema(baseURI, connectionString, sslConfig.isEnabled());
}
private static URI normalizeSchema(URI uri, String connectionString, boolean isSSLEnabled) {
// Make sure the protocol is correct
final String scheme;
if (isSSLEnabled) {
// It's ok to upgrade from http to https
scheme = "https";
} else {
// Silently downgrading from https to http can cause security issues
if ("https".equals(uri.getScheme())) {
throw new ClientException("SSL is disabled");
}
scheme = "http";
}
try {
return new URI(scheme, null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment());
} catch (URISyntaxException ex) {
throw new ClientException("Cannot parse process baseURI [" + connectionString + "] " + ex.getMessage());
}
}
private Collection<String> optionNames() {
@ -203,4 +212,16 @@ public class ConnectionConfiguration {
public String authPass() {
return pass;
}
public URI baseUri() {
return baseURI;
}
/**
* Returns the original connections string
*/
public String connectionString() {
return connectionString;
}
}

View File

@ -14,7 +14,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
@ -44,7 +47,16 @@ public class JreHttpUrlConnection implements Closeable {
*/
public static final String SQL_STATE_BAD_SERVER = "bad_server";
public static <R> R http(URL url, ConnectionConfiguration cfg, Function<JreHttpUrlConnection, R> handler) {
public static <R> R http(String path, String query, ConnectionConfiguration cfg, Function<JreHttpUrlConnection, R> handler) {
final URI uriPath = cfg.baseUri().resolve(path); // update path if needed
final String uriQuery = query == null ? uriPath.getQuery() : query; // update query if needed
final URL url;
try {
url = new URI(uriPath.getScheme(), null, uriPath.getHost(), uriPath.getPort(), uriPath.getPath(), uriQuery,
uriPath.getFragment()).toURL();
} catch (URISyntaxException | MalformedURLException ex) {
throw new ClientException(ex, "Cannot build url using base: [" + uriPath + "] query: [" + query + "] path: [" + path + "]");
}
try (JreHttpUrlConnection con = new JreHttpUrlConnection(url, cfg)) {
return handler.apply(con);
}
@ -95,8 +107,8 @@ public class JreHttpUrlConnection implements Closeable {
HttpsURLConnection https = (HttpsURLConnection) con;
SSLSocketFactory factory = cfg.sslConfig().sslSocketFactory();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
https.setSSLSocketFactory(factory);
return null;
https.setSSLSocketFactory(factory);
return null;
});
}
}
@ -120,9 +132,9 @@ public class JreHttpUrlConnection implements Closeable {
}
public <R> ResponseOrException<R> post(
CheckedConsumer<DataOutput, IOException> doc,
CheckedFunction<DataInput, R, IOException> parser
) throws ClientException {
CheckedConsumer<DataOutput, IOException> doc,
CheckedFunction<DataInput, R, IOException> parser
) throws ClientException {
try {
con.setRequestMethod("POST");
con.setDoOutput(true);
@ -147,13 +159,13 @@ public class JreHttpUrlConnection implements Closeable {
* think that the failure is not the fault of the application.
*/
return new ResponseOrException<>(new SQLException("Server encountered an error ["
+ failure.reason() + "]. [" + failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
+ failure.reason() + "]. [" + failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
}
SqlExceptionType type = SqlExceptionType.fromRemoteFailureType(failure.type());
if (type == null) {
return new ResponseOrException<>(new SQLException("Server sent bad type ["
+ failure.type() + "]. Original type was [" + failure.reason() + "]. ["
+ failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
+ failure.type() + "]. Original type was [" + failure.reason() + "]. ["
+ failure.remoteTrace() + "]", SQL_STATE_BAD_SERVER));
}
return new ResponseOrException<>(type.asException(failure.reason()));
} catch (IOException ex) {
@ -253,21 +265,21 @@ public class JreHttpUrlConnection implements Closeable {
public static SqlExceptionType fromRemoteFailureType(String type) {
switch (type) {
case "analysis_exception":
case "resource_not_found_exception":
case "verification_exception":
return DATA;
case "planning_exception":
case "mapping_exception":
return NOT_SUPPORTED;
case "parsing_exception":
return SYNTAX;
case "security_exception":
return SECURITY;
case "timeout_exception":
return TIMEOUT;
default:
return null;
case "analysis_exception":
case "resource_not_found_exception":
case "verification_exception":
return DATA;
case "planning_exception":
case "mapping_exception":
return NOT_SUPPORTED;
case "parsing_exception":
return SYNTAX;
case "security_exception":
return SECURITY;
case "timeout_exception":
return TIMEOUT;
default:
return null;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.client.shared;
import java.net.URI;
import java.net.URISyntaxException;
public final class UriUtils {
private UriUtils() {
}
/**
* Parses the URL provided by the user and
*/
public static URI parseURI(String connectionString, URI defaultURI) {
final URI uri = parseWithNoScheme(connectionString);
final String path = "".equals(uri.getPath()) ? defaultURI.getPath() : uri.getPath();
final String query = uri.getQuery() == null ? defaultURI.getQuery() : uri.getQuery();
final int port = uri.getPort() < 0 ? defaultURI.getPort() : uri.getPort();
try {
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), port, path, query, defaultURI.getFragment());
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid connection configuration [" + connectionString + "]: " + e.getMessage(), e);
}
}
private static URI parseWithNoScheme(String connectionString) {
URI uri;
// check if URI can be parsed correctly without adding scheme
// if the connection string is in format host:port or just host, the host is going to be null
// if the connection string contains IPv6 localhost [::1] the parsing will fail
try {
uri = new URI(connectionString);
if (uri.getHost() == null || uri.getScheme() == null) {
uri = null;
}
} catch (URISyntaxException e) {
uri = null;
}
if (uri == null) {
// We couldn't parse URI without adding scheme, let's try again with scheme this time
try {
return new URI("http://" + connectionString);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid connection configuration [" + connectionString + "]: " + e.getMessage(), e);
}
} else {
// We managed to parse URI and all necessary pieces are present, let's make sure the scheme is correct
if ("http".equals(uri.getScheme()) == false && "https".equals(uri.getScheme()) == false) {
throw new IllegalArgumentException(
"Invalid connection configuration [" + connectionString + "]: Only http and https protocols are supported");
}
return uri;
}
}
/**
* Removes the query part of the URI
*/
public static URI removeQuery(URI uri, String connectionString, URI defaultURI) {
try {
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(), null, defaultURI.getFragment());
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid connection configuration [" + connectionString + "]: " + e.getMessage(), e);
}
}
}

View File

@ -15,6 +15,7 @@ import org.junit.ClassRule;
import org.junit.rules.ExternalResource;
import java.io.DataInput;
import java.net.URI;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
@ -72,7 +73,7 @@ public class SSLTests extends ESTestCase {
prop.setProperty("ssl.truststore.pass", "password");
//prop.setProperty("ssl.accept.self.signed.certs", "true");
cfg = new ConnectionConfiguration(prop);
cfg = new ConnectionConfiguration(URI.create(sslServer.toString()), sslServer.toString(), prop);
}
@After
@ -101,14 +102,14 @@ public class SSLTests extends ESTestCase {
public void testSslHead() throws Exception {
assertTrue(AccessController.doPrivileged((PrivilegedAction<Boolean>) () -> {
return JreHttpUrlConnection.http(sslServer, cfg, JreHttpUrlConnection::head);
return JreHttpUrlConnection.http("", null, cfg, JreHttpUrlConnection::head);
}));
}
public void testSslPost() throws Exception {
String message = UUID.randomUUID().toString();
String received = AccessController.doPrivileged((PrivilegedAction<ResponseOrException<String>>) () ->
JreHttpUrlConnection.http(sslServer, cfg, c ->
JreHttpUrlConnection.http("", null, cfg, c ->
c.post(
out -> out.writeUTF(message),
DataInput::readUTF

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.client.shared;
import org.elasticsearch.test.ESTestCase;
import java.net.URI;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.parseURI;
import static org.elasticsearch.xpack.sql.client.shared.UriUtils.removeQuery;
public class UriUtilsTests extends ESTestCase {
public static URI DEFAULT_URI = URI.create("http://localhost:9200/");
public void testHostAndPort() throws Exception {
assertEquals(URI.create("http://server:9200/"), parseURI("server:9200", DEFAULT_URI));
}
public void testJustHost() throws Exception {
assertEquals(URI.create("http://server:9200/"), parseURI("server", DEFAULT_URI));
}
public void testHttpWithPort() throws Exception {
assertEquals(URI.create("http://server:9201/"), parseURI("http://server:9201", DEFAULT_URI));
}
public void testHttpsWithPort() throws Exception {
assertEquals(URI.create("https://server:9201/"), parseURI("https://server:9201", DEFAULT_URI));
}
public void testHttpNoPort() throws Exception {
assertEquals(URI.create("https://server:9200/"), parseURI("https://server", DEFAULT_URI));
}
public void testLocalhostV6() throws Exception {
assertEquals(URI.create("http://[::1]:51082/"), parseURI("[::1]:51082", DEFAULT_URI));
}
public void testHttpsWithUser() throws Exception {
assertEquals(URI.create("https://user@server:9200/"), parseURI("https://user@server", DEFAULT_URI));
}
public void testUserPassHost() throws Exception {
assertEquals(URI.create("http://user:password@server:9200/"), parseURI("user:password@server", DEFAULT_URI));
}
public void testHttpPath() throws Exception {
assertEquals(URI.create("https://server:9201/some_path"), parseURI("https://server:9201/some_path", DEFAULT_URI));
}
public void testHttpQuery() throws Exception {
assertEquals(URI.create("https://server:9201/?query"), parseURI("https://server:9201/?query", DEFAULT_URI));
}
public void testUnsupportedProtocol() throws Exception {
assertEquals(
"Invalid connection configuration [ftp://server:9201/]: Only http and https protocols are supported",
expectThrows(IllegalArgumentException.class, () -> parseURI("ftp://server:9201/", DEFAULT_URI)).getMessage()
);
}
public void testMalformed() throws Exception {
assertEquals(
"Invalid connection configuration []: Expected authority at index 7: http://",
expectThrows(IllegalArgumentException.class, () -> parseURI("", DEFAULT_URI)).getMessage()
);
}
public void testRemoveQuery() throws Exception {
assertEquals(URI.create("http://server:9100"),
removeQuery(URI.create("http://server:9100?query"), "http://server:9100?query", DEFAULT_URI));
}
public void testRemoveQueryTrailingSlash() throws Exception {
assertEquals(URI.create("http://server:9100/"),
removeQuery(URI.create("http://server:9100/?query"), "http://server:9100/?query", DEFAULT_URI));
}
public void testRemoveQueryNoQuery() throws Exception {
assertEquals(URI.create("http://server:9100"),
removeQuery(URI.create("http://server:9100"), "http://server:9100", DEFAULT_URI));
}
}