mirror of https://github.com/apache/druid.git
improved JDBC logging (#11676)
* improve jdbc and router query debug logging * log errors too * no stacktrace * trace those stacks
This commit is contained in:
parent
1ae1bbfc4f
commit
3044372fc1
|
@ -79,7 +79,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
*/
|
*/
|
||||||
public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements QueryCountStatsProvider
|
public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements QueryCountStatsProvider
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
|
private static final EmittingLogger LOG = new EmittingLogger(AsyncQueryForwardingServlet.class);
|
||||||
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
|
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
|
||||||
private static final String APPLICATION_SMILE = "application/smile";
|
private static final String APPLICATION_SMILE = "application/smile";
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
broadcastClient.stop();
|
broadcastClient.stop();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.warn(e, "Error stopping servlet");
|
LOG.warn(e, "Error stopping servlet");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,6 +219,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
String connectionId = getAvaticaProtobufConnectionId(protobufRequest);
|
String connectionId = getAvaticaProtobufConnectionId(protobufRequest);
|
||||||
targetServer = hostFinder.findServerAvatica(connectionId);
|
targetServer = hostFinder.findServerAvatica(connectionId);
|
||||||
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
||||||
|
LOG.debug("Forwarding protobuf JDBC connection [%s] to broker [%s]", connectionId, targetServer);
|
||||||
} else if (isAvaticaJson) {
|
} else if (isAvaticaJson) {
|
||||||
Map<String, Object> requestMap = objectMapper.readValue(
|
Map<String, Object> requestMap = objectMapper.readValue(
|
||||||
request.getInputStream(),
|
request.getInputStream(),
|
||||||
|
@ -228,10 +229,12 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
targetServer = hostFinder.findServerAvatica(connectionId);
|
targetServer = hostFinder.findServerAvatica(connectionId);
|
||||||
byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
|
byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
|
||||||
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
|
||||||
|
LOG.debug("Forwarding JDBC connection [%s] to broker [%s]", connectionId, targetServer.getHost());
|
||||||
} else if (HttpMethod.DELETE.is(method)) {
|
} else if (HttpMethod.DELETE.is(method)) {
|
||||||
// query cancellation request
|
// query cancellation request
|
||||||
targetServer = hostFinder.pickDefaultServer();
|
targetServer = hostFinder.pickDefaultServer();
|
||||||
broadcastQueryCancelRequest(request, targetServer);
|
broadcastQueryCancelRequest(request, targetServer);
|
||||||
|
LOG.debug("Broadcasting cancellation request to all brokers");
|
||||||
} else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) {
|
} else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) {
|
||||||
// query request
|
// query request
|
||||||
try {
|
try {
|
||||||
|
@ -241,8 +244,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
if (inputQuery.getId() == null) {
|
if (inputQuery.getId() == null) {
|
||||||
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
|
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
|
||||||
}
|
}
|
||||||
|
LOG.debug("Forwarding JSON query [%s] to broker [%s]", inputQuery.getId(), targetServer.getHost());
|
||||||
} else {
|
} else {
|
||||||
targetServer = hostFinder.pickDefaultServer();
|
targetServer = hostFinder.pickDefaultServer();
|
||||||
|
LOG.debug("Forwarding JSON request to broker [%s]", targetServer.getHost());
|
||||||
}
|
}
|
||||||
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
|
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
|
||||||
}
|
}
|
||||||
|
@ -259,6 +264,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
|
SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
|
||||||
request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
|
request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
|
||||||
targetServer = hostFinder.findServerSql(inputSqlQuery);
|
targetServer = hostFinder.findServerSql(inputSqlQuery);
|
||||||
|
LOG.debug("Forwarding SQL query to broker [%s]", targetServer.getHost());
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
handleQueryParseException(request, response, objectMapper, e, false);
|
handleQueryParseException(request, response, objectMapper, e, false);
|
||||||
|
@ -270,6 +276,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
targetServer = hostFinder.pickDefaultServer();
|
targetServer = hostFinder.pickDefaultServer();
|
||||||
|
LOG.debug("Forwarding query to broker [%s]", targetServer.getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost());
|
request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost());
|
||||||
|
@ -293,7 +300,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
// issue async requests
|
// issue async requests
|
||||||
Response.CompleteListener completeListener = result -> {
|
Response.CompleteListener completeListener = result -> {
|
||||||
if (result.isFailed()) {
|
if (result.isFailed()) {
|
||||||
log.warn(
|
LOG.warn(
|
||||||
result.getFailure(),
|
result.getFailure(),
|
||||||
"Failed to forward cancellation request to [%s]",
|
"Failed to forward cancellation request to [%s]",
|
||||||
server.getHost()
|
server.getHost()
|
||||||
|
@ -321,7 +328,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
boolean isNativeQuery
|
boolean isNativeQuery
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
log.warn(parseException, "Exception parsing query");
|
LOG.warn(parseException, "Exception parsing query");
|
||||||
|
|
||||||
// Log the error message
|
// Log the error message
|
||||||
final String errorMessage = parseException.getMessage() == null
|
final String errorMessage = parseException.getMessage() == null
|
||||||
|
@ -407,7 +414,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
proxyRequest
|
proxyRequest
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
log.error("Can not find Authenticator with Name [%s]", authenticationResult.getAuthenticatedBy());
|
LOG.error("Can not find Authenticator with Name [%s]", authenticationResult.getAuthenticatedBy());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
super.sendProxyRequest(
|
super.sendProxyRequest(
|
||||||
|
@ -682,7 +689,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.error(e, "Unable to log query [%s]!", query);
|
LOG.error(e, "Unable to log query [%s]!", query);
|
||||||
}
|
}
|
||||||
|
|
||||||
super.onComplete(result);
|
super.onComplete(result);
|
||||||
|
@ -712,10 +719,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (IOException logError) {
|
catch (IOException logError) {
|
||||||
log.error(logError, "Unable to log query [%s]!", query);
|
LOG.error(logError, "Unable to log query [%s]!", query);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.makeAlert(failure, "Exception handling request")
|
LOG.makeAlert(failure, "Exception handling request")
|
||||||
.addData("exception", failure.toString())
|
.addData("exception", failure.toString())
|
||||||
.addData("query", query)
|
.addData("query", query)
|
||||||
.addData("peer", req.getRemoteAddr())
|
.addData("peer", req.getRemoteAddr())
|
||||||
|
|
|
@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
*/
|
*/
|
||||||
public class DruidConnection
|
public class DruidConnection
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DruidConnection.class);
|
private static final Logger LOG = new Logger(DruidConnection.class);
|
||||||
private static final Set<String> SENSITIVE_CONTEXT_FIELDS = Sets.newHashSet(
|
private static final Set<String> SENSITIVE_CONTEXT_FIELDS = Sets.newHashSet(
|
||||||
"user", "password"
|
"user", "password"
|
||||||
);
|
);
|
||||||
|
@ -71,6 +71,11 @@ public class DruidConnection
|
||||||
this.statements = new ConcurrentHashMap<>();
|
this.statements = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getConnectionId()
|
||||||
|
{
|
||||||
|
return connectionId;
|
||||||
|
}
|
||||||
|
|
||||||
public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory)
|
public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory)
|
||||||
{
|
{
|
||||||
final int statementId = statementCounter.incrementAndGet();
|
final int statementId = statementCounter.incrementAndGet();
|
||||||
|
@ -79,11 +84,11 @@ public class DruidConnection
|
||||||
if (statements.containsKey(statementId)) {
|
if (statements.containsKey(statementId)) {
|
||||||
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
|
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
|
||||||
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
|
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
|
||||||
throw new ISE("Uh oh, too many statements");
|
throw DruidMeta.logFailure(new ISE("Uh oh, too many statements"));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (statements.size() >= maxStatements) {
|
if (statements.size() >= maxStatements) {
|
||||||
throw new ISE("Too many open statements, limit is[%,d]", maxStatements);
|
throw DruidMeta.logFailure(new ISE("Too many open statements, limit is[%,d]", maxStatements));
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove sensitive fields from the context, only the connection's context needs to have authentication
|
// remove sensitive fields from the context, only the connection's context needs to have authentication
|
||||||
|
@ -101,14 +106,14 @@ public class DruidConnection
|
||||||
sqlLifecycleFactory.factorize(),
|
sqlLifecycleFactory.factorize(),
|
||||||
() -> {
|
() -> {
|
||||||
// onClose function for the statement
|
// onClose function for the statement
|
||||||
log.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
|
LOG.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
|
||||||
// statements will be accessed unsynchronized to avoid deadlock
|
// statements will be accessed unsynchronized to avoid deadlock
|
||||||
statements.remove(statementId);
|
statements.remove(statementId);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
statements.put(statementId, statement);
|
statements.put(statementId, statement);
|
||||||
log.debug("Connection[%s] opened statement[%s].", connectionId, statementId);
|
LOG.debug("Connection[%s] opened statement[%s].", connectionId, statementId);
|
||||||
return statement;
|
return statement;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,11 +151,11 @@ public class DruidConnection
|
||||||
statement.close();
|
statement.close();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId());
|
LOG.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Connection[%s] closed.", connectionId);
|
LOG.debug("Connection[%s] closed.", connectionId);
|
||||||
open = false;
|
open = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -63,7 +64,19 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class DruidMeta extends MetaImpl
|
public class DruidMeta extends MetaImpl
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DruidMeta.class);
|
public static <T extends Throwable> T logFailure(T error, String message, Object... format)
|
||||||
|
{
|
||||||
|
LOG.error(message, format);
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends Throwable> T logFailure(T error)
|
||||||
|
{
|
||||||
|
LOG.error(error, error.getMessage());
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Logger LOG = new Logger(DruidMeta.class);
|
||||||
|
|
||||||
private final SqlLifecycleFactory sqlLifecycleFactory;
|
private final SqlLifecycleFactory sqlLifecycleFactory;
|
||||||
private final ScheduledExecutorService exec;
|
private final ScheduledExecutorService exec;
|
||||||
|
@ -153,14 +166,19 @@ public class DruidMeta extends MetaImpl
|
||||||
druidStatement = getDruidStatement(statement);
|
druidStatement = getDruidStatement(statement);
|
||||||
}
|
}
|
||||||
catch (NoSuchStatementException e) {
|
catch (NoSuchStatementException e) {
|
||||||
throw new IllegalStateException(e);
|
throw logFailure(new IllegalStateException(e));
|
||||||
}
|
}
|
||||||
final DruidConnection druidConnection = getDruidConnection(statement.connectionId);
|
final DruidConnection druidConnection = getDruidConnection(statement.connectionId);
|
||||||
AuthenticationResult authenticationResult = authenticateConnection(druidConnection);
|
AuthenticationResult authenticationResult = authenticateConnection(druidConnection);
|
||||||
if (authenticationResult == null) {
|
if (authenticationResult == null) {
|
||||||
throw new ForbiddenException("Authentication failed.");
|
throw logFailure(
|
||||||
|
new ForbiddenException("Authentication failed."),
|
||||||
|
"Authentication failed for statement[%s]",
|
||||||
|
druidStatement.getStatementId()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
statement.signature = druidStatement.prepare(sql, maxRowCount, authenticationResult).getSignature();
|
statement.signature = druidStatement.prepare(sql, maxRowCount, authenticationResult).getSignature();
|
||||||
|
LOG.debug("Successfully prepared statement[%s] for execution", druidStatement.getStatementId());
|
||||||
return statement;
|
return statement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,7 +209,11 @@ public class DruidMeta extends MetaImpl
|
||||||
final DruidConnection druidConnection = getDruidConnection(statement.connectionId);
|
final DruidConnection druidConnection = getDruidConnection(statement.connectionId);
|
||||||
AuthenticationResult authenticationResult = authenticateConnection(druidConnection);
|
AuthenticationResult authenticationResult = authenticateConnection(druidConnection);
|
||||||
if (authenticationResult == null) {
|
if (authenticationResult == null) {
|
||||||
throw new ForbiddenException("Authentication failed.");
|
throw logFailure(
|
||||||
|
new ForbiddenException("Authentication failed."),
|
||||||
|
"Authentication failed for statement[%s]",
|
||||||
|
druidStatement.getStatementId()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
druidStatement.prepare(sql, maxRowCount, authenticationResult);
|
druidStatement.prepare(sql, maxRowCount, authenticationResult);
|
||||||
final Frame firstFrame = druidStatement.execute(Collections.emptyList())
|
final Frame firstFrame = druidStatement.execute(Collections.emptyList())
|
||||||
|
@ -200,6 +222,7 @@ public class DruidMeta extends MetaImpl
|
||||||
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
|
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
|
||||||
);
|
);
|
||||||
final Signature signature = druidStatement.getSignature();
|
final Signature signature = druidStatement.getSignature();
|
||||||
|
LOG.debug("Successfully prepared statement[%s] and started execution", druidStatement.getStatementId());
|
||||||
return new ExecuteResult(
|
return new ExecuteResult(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
MetaResultSet.create(
|
MetaResultSet.create(
|
||||||
|
@ -240,7 +263,9 @@ public class DruidMeta extends MetaImpl
|
||||||
final int fetchMaxRowCount
|
final int fetchMaxRowCount
|
||||||
) throws NoSuchStatementException, MissingResultsException
|
) throws NoSuchStatementException, MissingResultsException
|
||||||
{
|
{
|
||||||
return getDruidStatement(statement).nextFrame(offset, getEffectiveMaxRowsPerFrame(fetchMaxRowCount));
|
final int maxRows = getEffectiveMaxRowsPerFrame(fetchMaxRowCount);
|
||||||
|
LOG.debug("Fetching next frame from offset[%s] with [%s] rows for statement[%s]", offset, maxRows, statement.id);
|
||||||
|
return getDruidStatement(statement).nextFrame(offset, maxRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -270,6 +295,7 @@ public class DruidMeta extends MetaImpl
|
||||||
);
|
);
|
||||||
|
|
||||||
final Signature signature = druidStatement.getSignature();
|
final Signature signature = druidStatement.getSignature();
|
||||||
|
LOG.debug("Successfully started execution of statement[%s]", druidStatement.getStatementId());
|
||||||
return new ExecuteResult(
|
return new ExecuteResult(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
MetaResultSet.create(
|
MetaResultSet.create(
|
||||||
|
@ -320,7 +346,7 @@ public class DruidMeta extends MetaImpl
|
||||||
final boolean isDone = druidStatement.isDone();
|
final boolean isDone = druidStatement.isDone();
|
||||||
final long currentOffset = druidStatement.getCurrentOffset();
|
final long currentOffset = druidStatement.getCurrentOffset();
|
||||||
if (currentOffset != offset) {
|
if (currentOffset != offset) {
|
||||||
throw new ISE("Requested offset[%,d] does not match currentOffset[%,d]", offset, currentOffset);
|
throw logFailure(new ISE("Requested offset[%,d] does not match currentOffset[%,d]", offset, currentOffset));
|
||||||
}
|
}
|
||||||
return !isDone;
|
return !isDone;
|
||||||
}
|
}
|
||||||
|
@ -519,15 +545,23 @@ public class DruidMeta extends MetaImpl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
private AuthenticationResult authenticateConnection(final DruidConnection connection)
|
private AuthenticationResult authenticateConnection(final DruidConnection connection)
|
||||||
{
|
{
|
||||||
Map<String, Object> context = connection.context();
|
Map<String, Object> context = connection.context();
|
||||||
for (Authenticator authenticator : authenticators) {
|
for (Authenticator authenticator : authenticators) {
|
||||||
|
LOG.debug("Attempting authentication with authenticator[%s]", authenticator.getClass());
|
||||||
AuthenticationResult authenticationResult = authenticator.authenticateJDBCContext(context);
|
AuthenticationResult authenticationResult = authenticator.authenticateJDBCContext(context);
|
||||||
if (authenticationResult != null) {
|
if (authenticationResult != null) {
|
||||||
|
LOG.debug(
|
||||||
|
"Authenticated identity[%s] for connection[%s]",
|
||||||
|
authenticationResult.getIdentity(),
|
||||||
|
connection.getConnectionId()
|
||||||
|
);
|
||||||
return authenticationResult;
|
return authenticationResult;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("No successful authentication");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,7 +585,7 @@ public class DruidMeta extends MetaImpl
|
||||||
if (connectionCount.get() > config.getMaxConnections()) {
|
if (connectionCount.get() > config.getMaxConnections()) {
|
||||||
// We aren't going to make a connection after all.
|
// We aren't going to make a connection after all.
|
||||||
connectionCount.decrementAndGet();
|
connectionCount.decrementAndGet();
|
||||||
throw new ISE("Too many connections, limit is[%,d]", config.getMaxConnections());
|
throw logFailure(new ISE("Too many connections, limit is[%,d]", config.getMaxConnections()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,10 +597,10 @@ public class DruidMeta extends MetaImpl
|
||||||
if (putResult != null) {
|
if (putResult != null) {
|
||||||
// Didn't actually insert the connection.
|
// Didn't actually insert the connection.
|
||||||
connectionCount.decrementAndGet();
|
connectionCount.decrementAndGet();
|
||||||
throw new ISE("Connection[%s] already open.", connectionId);
|
throw logFailure(new ISE("Connection[%s] already open.", connectionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Connection[%s] opened.", connectionId);
|
LOG.debug("Connection[%s] opened.", connectionId);
|
||||||
|
|
||||||
// Call getDruidConnection to start the timeout timer.
|
// Call getDruidConnection to start the timeout timer.
|
||||||
return getDruidConnection(connectionId);
|
return getDruidConnection(connectionId);
|
||||||
|
@ -587,13 +621,13 @@ public class DruidMeta extends MetaImpl
|
||||||
final DruidConnection connection = connections.get(connectionId);
|
final DruidConnection connection = connections.get(connectionId);
|
||||||
|
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
throw new NoSuchConnectionException(connectionId);
|
throw logFailure(new NoSuchConnectionException(connectionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection.sync(
|
return connection.sync(
|
||||||
exec.schedule(
|
exec.schedule(
|
||||||
() -> {
|
() -> {
|
||||||
log.debug("Connection[%s] timed out.", connectionId);
|
LOG.debug("Connection[%s] timed out.", connectionId);
|
||||||
closeConnection(new ConnectionHandle(connectionId));
|
closeConnection(new ConnectionHandle(connectionId));
|
||||||
},
|
},
|
||||||
new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(),
|
new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(),
|
||||||
|
@ -608,7 +642,7 @@ public class DruidMeta extends MetaImpl
|
||||||
final DruidConnection connection = getDruidConnection(statement.connectionId);
|
final DruidConnection connection = getDruidConnection(statement.connectionId);
|
||||||
final DruidStatement druidStatement = connection.getStatement(statement.id);
|
final DruidStatement druidStatement = connection.getStatement(statement.id);
|
||||||
if (druidStatement == null) {
|
if (druidStatement == null) {
|
||||||
throw new NoSuchStatementException(statement);
|
throw logFailure(new NoSuchStatementException(statement));
|
||||||
}
|
}
|
||||||
return druidStatement;
|
return druidStatement;
|
||||||
}
|
}
|
||||||
|
@ -620,12 +654,12 @@ public class DruidMeta extends MetaImpl
|
||||||
final ExecuteResult result = prepareAndExecute(statement, sql, -1, -1, null);
|
final ExecuteResult result = prepareAndExecute(statement, sql, -1, -1, null);
|
||||||
final MetaResultSet metaResultSet = Iterables.getOnlyElement(result.resultSets);
|
final MetaResultSet metaResultSet = Iterables.getOnlyElement(result.resultSets);
|
||||||
if (!metaResultSet.firstFrame.done) {
|
if (!metaResultSet.firstFrame.done) {
|
||||||
throw new ISE("Expected all results to be in a single frame!");
|
throw logFailure(new ISE("Expected all results to be in a single frame!"));
|
||||||
}
|
}
|
||||||
return metaResultSet;
|
return metaResultSet;
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw logFailure(new RuntimeException(e));
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
closeStatement(statement);
|
closeStatement(statement);
|
||||||
|
|
|
@ -360,6 +360,8 @@ public class DruidStatement implements Closeable
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable, null, -1);
|
sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable, null, -1);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
DruidMeta.logFailure(this.throwable);
|
||||||
}
|
}
|
||||||
onClose.run();
|
onClose.run();
|
||||||
}
|
}
|
||||||
|
@ -388,6 +390,7 @@ public class DruidStatement implements Closeable
|
||||||
private DruidStatement closeAndPropagateThrowable(Throwable t)
|
private DruidStatement closeAndPropagateThrowable(Throwable t)
|
||||||
{
|
{
|
||||||
this.throwable = t;
|
this.throwable = t;
|
||||||
|
DruidMeta.logFailure(t);
|
||||||
try {
|
try {
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue