SQL: Improve JDBC communication (elastic/x-pack-elasticsearch#2660)
* Improve JDBC communication Jdbc HTTP client uses only one url for messages and relies on / for ping Fixed ES prefix being discarded (missing /) Add HEAD handler for JDBC endpoint Original commit: elastic/x-pack-elasticsearch@389f82262e
This commit is contained in:
parent
6a1806a3eb
commit
57fcbb81cb
|
@ -36,6 +36,7 @@ public abstract class ProtoHttpServer {
|
|||
executor = new ThreadPoolExecutor(0, 10, 250, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
|
||||
|
||||
server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), port), 0);
|
||||
server.createContext("/", new RootHandler());
|
||||
server.createContext(protoSuffix, handler);
|
||||
server.setExecutor(executor);
|
||||
server.start();
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.qa.sql.embed;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
class RootHandler implements HttpHandler {
|
||||
|
||||
private static final Logger log = ESLoggerFactory.getLogger(RootHandler.class.getName());
|
||||
|
||||
@Override
|
||||
public void handle(HttpExchange http) throws IOException {
|
||||
log.debug("Received query call...");
|
||||
|
||||
if ("HEAD".equals(http.getRequestMethod())) {
|
||||
http.sendResponseHeaders(RestStatus.OK.getStatus(), 0);
|
||||
http.close();
|
||||
return;
|
||||
}
|
||||
|
||||
fail(http, new UnsupportedOperationException("only HEAD allowed"));
|
||||
}
|
||||
|
||||
protected void fail(HttpExchange http, Exception ex) {
|
||||
log.error("Caught error while transmitting response", ex);
|
||||
try {
|
||||
// the error conversion has failed, halt
|
||||
if (http.getResponseHeaders().isEmpty()) {
|
||||
http.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
|
||||
}
|
||||
} catch (IOException ioEx) {
|
||||
log.error("Caught error while trying to catch error", ex);
|
||||
} finally {
|
||||
http.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ import java.nio.file.Paths;
|
|||
import java.sql.CallableStatement;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ParameterMetaData;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
|
@ -28,6 +29,8 @@ import java.sql.Statement;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
public final class Debug {
|
||||
|
||||
// cache for streams created by ourselves
|
||||
|
@ -40,6 +43,18 @@ public final class Debug {
|
|||
private static volatile DebugLog ERR = null, OUT = null;
|
||||
private static volatile PrintStream SYS_ERR = null, SYS_OUT = null;
|
||||
|
||||
/**
|
||||
* Create a proxied Connection which performs logging of all methods being invoked.
|
||||
* Typically Debug will read its configuration from the configuration and act accordingly however
|
||||
* there are two cases where the output is specified programmatically, namely through
|
||||
* {@link DriverManager#setLogWriter(PrintWriter)} and {@link DataSource#setLogWriter(PrintWriter)}.
|
||||
* The former is the 'legacy' way, having a global impact on all drivers while the latter allows per
|
||||
* instance configuration.
|
||||
*
|
||||
* As both approaches are not widely used, Debug will take the principle of least surprise and pick its
|
||||
* own configuration first; if that does not exist it will fallback to the managed approaches (assuming they
|
||||
* are specified, otherwise logging is simply disabled).
|
||||
*/
|
||||
public static Connection proxy(JdbcConfiguration info, Connection connection, PrintWriter managedPrinter) {
|
||||
return createProxy(Connection.class, new ConnectionProxy(logger(info, managedPrinter), connection));
|
||||
}
|
||||
|
|
|
@ -32,10 +32,6 @@ import java.util.TimeZone;
|
|||
public class JdbcConfiguration extends ConnectionConfiguration {
|
||||
static final String URL_PREFIX = "jdbc:es:";
|
||||
|
||||
static final String USER = "user";
|
||||
|
||||
static final String PASSWORD = "password";
|
||||
|
||||
static final String DEBUG = "debug";
|
||||
static final String DEBUG_DEFAULT = "false";
|
||||
|
||||
|
@ -124,6 +120,9 @@ public class JdbcConfiguration extends ConnectionConfiguration {
|
|||
index = urlFile.indexOf("?");
|
||||
if (index > 0) {
|
||||
urlFile = urlFile.substring(0, index);
|
||||
if (!urlFile.endsWith("/")) {
|
||||
urlFile = urlFile + "/";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -184,12 +183,12 @@ public class JdbcConfiguration extends ConnectionConfiguration {
|
|||
}
|
||||
|
||||
public String userName() {
|
||||
return settings().getProperty(USER);
|
||||
return settings().getProperty(AUTH_USER);
|
||||
}
|
||||
|
||||
public String password() {
|
||||
// NOCOMMIT make sure we're doing right by the password. Compare with other jdbc drivers and our security code.
|
||||
return settings().getProperty(PASSWORD);
|
||||
return settings().getProperty(AUTH_PASS);
|
||||
}
|
||||
|
||||
private int port() {
|
||||
|
|
|
@ -68,13 +68,15 @@ public class JdbcDriver implements java.sql.Driver, Closeable {
|
|||
return null;
|
||||
}
|
||||
|
||||
JdbcConfiguration info = initInfo(url, props);
|
||||
JdbcConnection con = new JdbcConnection(info);
|
||||
return info.debug() ? Debug.proxy(info, con, DriverManager.getLogWriter()) : con;
|
||||
JdbcConfiguration cfg = initCfg(url, props);
|
||||
JdbcConnection con = new JdbcConnection(cfg);
|
||||
return cfg.debug() ? Debug.proxy(cfg, con, DriverManager.getLogWriter()) : con;
|
||||
}
|
||||
|
||||
private static JdbcConfiguration initInfo(String url, Properties props) {
|
||||
private static JdbcConfiguration initCfg(String url, Properties props) {
|
||||
JdbcConfiguration ci = new JdbcConfiguration(url, props);
|
||||
|
||||
// if there's a timeout set on the DriverManager, make sure to use it
|
||||
if (DriverManager.getLoginTimeout() > 0) {
|
||||
ci.connectTimeout(TimeUnit.SECONDS.toMillis(DriverManager.getLoginTimeout()));
|
||||
}
|
||||
|
|
|
@ -212,8 +212,14 @@ class JdbcStatement implements Statement, JdbcWrapper {
|
|||
@Override
|
||||
public int getFetchSize() throws SQLException {
|
||||
checkOpen();
|
||||
// NOCOMMIT this will return a bad value because we use -1 to mean "default" but default is something defined in connection string
|
||||
return requestMeta.fetchSize();
|
||||
int fetchSize = requestMeta.fetchSize();
|
||||
// the spec is somewhat unclear. It looks like there are 3 states:
|
||||
// unset (in this case -1 which the user cannot set) - in this case, the default fetch size is returned
|
||||
// 0 meaning the hint is disabled (the user has called setFetch)
|
||||
// >0 means actual hint
|
||||
|
||||
// tl;dr - if invalid, it means it was not set so return default - otherwise return the set value
|
||||
return fetchSize < 0 ? cfg.pageSize() : fetchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.jdbc.jdbcx;
|
|||
import org.elasticsearch.xpack.sql.jdbc.debug.Debug;
|
||||
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration;
|
||||
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConnection;
|
||||
import org.elasticsearch.xpack.sql.net.client.ConnectionConfiguration;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.PrintWriter;
|
||||
|
@ -20,13 +21,12 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.logging.Logger;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
public class JdbcDataSource implements DataSource, Wrapper, Closeable {
|
||||
|
||||
private String url;
|
||||
private PrintWriter writer;
|
||||
private int loginTimeout;
|
||||
private Properties info;
|
||||
private Properties props;
|
||||
|
||||
public JdbcDataSource() {}
|
||||
|
||||
|
@ -42,6 +42,9 @@ public class JdbcDataSource implements DataSource, Wrapper, Closeable {
|
|||
|
||||
@Override
|
||||
public void setLoginTimeout(int seconds) throws SQLException {
|
||||
if (seconds < 0) {
|
||||
throw new SQLException("Negative timeout specified " + seconds);
|
||||
}
|
||||
loginTimeout = seconds;
|
||||
}
|
||||
|
||||
|
@ -63,32 +66,40 @@ public class JdbcDataSource implements DataSource, Wrapper, Closeable {
|
|||
this.url = url;
|
||||
}
|
||||
|
||||
public Properties getInfo() {
|
||||
return info;
|
||||
public Properties getProperties() {
|
||||
return props;
|
||||
}
|
||||
|
||||
public void setInfo(Properties props) {
|
||||
this.info = props;
|
||||
public void setProperties(Properties props) {
|
||||
this.props = props;
|
||||
}
|
||||
|
||||
private Properties createConfig() {
|
||||
Properties p = props != null ? new Properties(props) : new Properties();
|
||||
return p;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws SQLException {
|
||||
return doGetConnection(info);
|
||||
return doGetConnection(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection(String username, String password) throws SQLException {
|
||||
// TODO: set user/pass props
|
||||
throw new UnsupportedOperationException();
|
||||
//return doGetConnection(null);
|
||||
Properties p = createConfig();
|
||||
p.setProperty(ConnectionConfiguration.AUTH_USER, username);
|
||||
p.setProperty(ConnectionConfiguration.AUTH_PASS, password);
|
||||
return doGetConnection(p);
|
||||
}
|
||||
|
||||
private Connection doGetConnection(Properties p) {
|
||||
JdbcConfiguration ci = new JdbcConfiguration(url, p);
|
||||
JdbcConfiguration cfg = new JdbcConfiguration(url, p);
|
||||
if (loginTimeout > 0) {
|
||||
ci.connectTimeout(TimeUnit.SECONDS.toMillis(loginTimeout));
|
||||
cfg.connectTimeout(TimeUnit.SECONDS.toMillis(loginTimeout));
|
||||
}
|
||||
return new JdbcConnection(ci);
|
||||
JdbcConnection con = new JdbcConnection(cfg);
|
||||
// enable logging if needed
|
||||
return Debug.proxy(cfg, con, writer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,11 +25,18 @@ import java.sql.SQLException;
|
|||
class HttpClient {
|
||||
|
||||
private final JdbcConfiguration cfg;
|
||||
private final URL baseUrl;
|
||||
private final URL url;
|
||||
|
||||
HttpClient(JdbcConfiguration connectionInfo) {
|
||||
this.cfg = connectionInfo;
|
||||
baseUrl = connectionInfo.asUrl();
|
||||
URL baseUrl = connectionInfo.asUrl();
|
||||
try {
|
||||
// the baseUrl ends up / so the suffix can be directly appended
|
||||
// NOCOMMIT Do something with the error trace. Useful for filing bugs and debugging.
|
||||
this.url = new URL(baseUrl, "_sql/jdbc?error_trace=true");
|
||||
} catch (MalformedURLException ex) {
|
||||
throw new JdbcException(ex, "Cannot connect to JDBC endpoint [" + baseUrl.toString() + "_sql/jdbc]");
|
||||
}
|
||||
}
|
||||
|
||||
void setNetworkTimeout(long millis) {
|
||||
|
@ -40,32 +47,23 @@ class HttpClient {
|
|||
return cfg.networkTimeout();
|
||||
}
|
||||
|
||||
private URL url(String subPath) {
|
||||
try {
|
||||
return new URL(baseUrl, subPath);
|
||||
} catch (MalformedURLException ex) {
|
||||
throw new JdbcException(ex, "Invalid subpath [" + subPath + "]");
|
||||
}
|
||||
}
|
||||
|
||||
boolean head(String path) { // NOCOMMIT remove path?
|
||||
boolean head() {
|
||||
try {
|
||||
URL root = new URL(url, "/");
|
||||
return AccessController.doPrivileged((PrivilegedAction<Boolean>) () -> {
|
||||
return JreHttpUrlConnection.http(url(path), cfg, JreHttpUrlConnection::head);
|
||||
return JreHttpUrlConnection.http(root, cfg, JreHttpUrlConnection::head);
|
||||
});
|
||||
} catch (MalformedURLException ex) {
|
||||
throw new JdbcException(ex, "Cannot ping server");
|
||||
} catch (ClientException ex) {
|
||||
throw new JdbcException(ex, "Transport failure");
|
||||
}
|
||||
}
|
||||
|
||||
BytesArray put(CheckedConsumer<DataOutput, IOException> os) throws SQLException {
|
||||
return put("_sql/jdbc?error_trace=true", os); // NOCOMMIT Do something with the error trace. Useful for filing bugs and debugging.
|
||||
}
|
||||
|
||||
BytesArray put(String path, CheckedConsumer<DataOutput, IOException> os) throws SQLException { // NOCOMMIT remove path?
|
||||
try {
|
||||
return AccessController.doPrivileged((PrivilegedAction<BytesArray>) () -> {
|
||||
return JreHttpUrlConnection.http(url(path), cfg, con -> {
|
||||
return JreHttpUrlConnection.http(url, cfg, con -> {
|
||||
return new BytesArray(con.post(os));
|
||||
});
|
||||
});
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
|
|||
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageResponse;
|
||||
import org.elasticsearch.xpack.sql.jdbc.util.BytesArray;
|
||||
import org.elasticsearch.xpack.sql.jdbc.util.FastByteArrayInputStream;
|
||||
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
|
||||
import org.elasticsearch.xpack.sql.protocol.shared.Request;
|
||||
import org.elasticsearch.xpack.sql.protocol.shared.Response;
|
||||
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
|
||||
|
@ -55,17 +54,18 @@ public class JdbcHttpClient implements Closeable {
|
|||
|
||||
public boolean ping(long timeoutInMs) {
|
||||
long oldTimeout = http.getNetworkTimeout();
|
||||
// NOCOMMIT this seems race condition-y
|
||||
http.setNetworkTimeout(timeoutInMs);
|
||||
try {
|
||||
return http.head(StringUtils.EMPTY);
|
||||
// this works since the connection is single-threaded and its configuration not shared
|
||||
// with others connections
|
||||
http.setNetworkTimeout(timeoutInMs);
|
||||
return http.head();
|
||||
} finally {
|
||||
http.setNetworkTimeout(oldTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
public Cursor query(String sql, RequestMeta meta) throws SQLException {
|
||||
int fetch = meta.fetchSize() >= 0 ? meta.fetchSize() : conCfg.pageSize();
|
||||
int fetch = meta.fetchSize() > 0 ? meta.fetchSize() : conCfg.pageSize();
|
||||
QueryInitRequest request = new QueryInitRequest(sql, fetch, conCfg.timeZone(), timeout(meta));
|
||||
BytesArray ba = http.put(out -> Proto.INSTANCE.writeRequest(request, out));
|
||||
QueryInitResponse response = doIO(ba, in -> (QueryInitResponse) readResponse(request, in));
|
||||
|
|
|
@ -35,7 +35,7 @@ public class JdbcConfigurationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testMultiPathSuffix() throws Exception {
|
||||
assertThat(ci("jdbc:es://a:1/foo/bar/tar").asUrl().toString(), is("http://a:1/foo/bar/tar"));
|
||||
assertThat(ci("jdbc:es://a:1/foo/bar/tar/").asUrl().toString(), is("http://a:1/foo/bar/tar"));
|
||||
}
|
||||
|
||||
public void testDebug() throws Exception {
|
||||
|
@ -59,7 +59,7 @@ public class JdbcConfigurationTests extends ESTestCase {
|
|||
|
||||
public void testDebugOutWithSuffix() throws Exception {
|
||||
JdbcConfiguration ci = ci("jdbc:es://a:1/foo/bar/tar?debug=true&debug.output=jdbc.out");
|
||||
assertThat(ci.asUrl().toString(), is("http://a:1/foo/bar/tar"));
|
||||
assertThat(ci.asUrl().toString(), is("http://a:1/foo/bar/tar/"));
|
||||
assertThat(ci.debug(), is(true));
|
||||
assertThat(ci.debugOut(), is("jdbc.out"));
|
||||
}
|
||||
|
|
|
@ -29,32 +29,31 @@ public class ConnectionConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// Timeouts
|
||||
|
||||
// 30s
|
||||
private static final String CONNECT_TIMEOUT = "connect.timeout";
|
||||
public static final String CONNECT_TIMEOUT = "connect.timeout";
|
||||
private static final String CONNECT_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.SECONDS.toMillis(30));
|
||||
|
||||
// 1m
|
||||
private static final String NETWORK_TIMEOUT = "network.timeout";
|
||||
public static final String NETWORK_TIMEOUT = "network.timeout";
|
||||
private static final String NETWORK_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(1));
|
||||
|
||||
// 1m
|
||||
private static final String QUERY_TIMEOUT = "query.timeout";
|
||||
public static final String QUERY_TIMEOUT = "query.timeout";
|
||||
private static final String QUERY_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(1));
|
||||
|
||||
// 5m
|
||||
private static final String PAGE_TIMEOUT = "page.timeout";
|
||||
public static final String PAGE_TIMEOUT = "page.timeout";
|
||||
private static final String PAGE_TIMEOUT_DEFAULT = String.valueOf(TimeUnit.MINUTES.toMillis(5));
|
||||
|
||||
private static final String PAGE_SIZE = "page.size";
|
||||
public static final String PAGE_SIZE = "page.size";
|
||||
private static final String PAGE_SIZE_DEFAULT = "1000";
|
||||
|
||||
// Auth
|
||||
|
||||
private static final String AUTH_USER = "user";
|
||||
private static final String AUTH_PASS = "pass";
|
||||
public static final String AUTH_USER = "user";
|
||||
public static final String AUTH_PASS = "pass";
|
||||
|
||||
// Proxy
|
||||
|
||||
|
|
|
@ -178,4 +178,4 @@ public class JreHttpUrlConnection implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue