add druid jdbc handler config for minimum number of rows per frame (#10880)

* add druid jdbc handler config for minimum number of rows per frame

* javadocs and docs adjustments

* spelling

* adjust docs per review with minor tweaks

* adjust more
This commit is contained in:
Clint Wylie 2021-02-23 02:11:04 -08:00 committed by GitHub
parent 3a0a0c033f
commit f34c6eb3c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 9 deletions

View File

@ -1644,7 +1644,8 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.enable`|Whether to enable SQL at all, including background metadata fetching. If false, this overrides all other SQL-related properties and disables SQL metadata, serving, and planning completely.|true|
|`druid.sql.avatica.enable`|Whether to enable JDBC querying at `/druid/v2/sql/avatica/`.|true|
|`druid.sql.avatica.maxConnections`|Maximum number of open connections for the Avatica server. These are not HTTP connections, but are logical client connections that may span multiple HTTP connections.|25|
|`druid.sql.avatica.maxRowsPerFrame`|Maximum number of rows to return in a single JDBC frame. Setting this property to -1 indicates that no row limit should be applied. Clients can optionally specify a row limit in their requests; if a client specifies a row limit, the lesser value of the client-provided limit and `maxRowsPerFrame` will be used.|5,000|
|`druid.sql.avatica.maxRowsPerFrame`|Maximum acceptable value for the JDBC client `Statement.setFetchSize` method. This setting determines the maximum number of rows that Druid will populate in a single 'fetch' for a JDBC `ResultSet`. Set this property to -1 to enforce no row limit on the server-side and potentially return the entire set of rows on the initial statement execution. If the JDBC client calls `Statement.setFetchSize` with a value other than -1, Druid uses the lesser value of the client-provided limit and `maxRowsPerFrame`. If `maxRowsPerFrame` is smaller than `minRowsPerFrame`, then the `ResultSet` size will be fixed. To handle queries that produce results with a large number of rows, you can increase value of `druid.sql.avatica.maxRowsPerFrame` to reduce the number of fetches required to completely transfer the result set.|5,000|
|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|

View File

@ -20,21 +20,31 @@
package org.apache.druid.sql.avatica;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Period;
class AvaticaServerConfig
{
@JsonProperty
public int maxConnections = 25;
public static int DEFAULT_MAX_CONNECTIONS = 25;
public static int DEFAULT_MAX_STATEMENTS_PER_CONNECTION = 4;
public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;
@JsonProperty
public int maxStatementsPerConnection = 4;
public int maxConnections = DEFAULT_MAX_CONNECTIONS;
@JsonProperty
public Period connectionIdleTimeout = new Period("PT5M");
public int maxStatementsPerConnection = DEFAULT_MAX_STATEMENTS_PER_CONNECTION;
@JsonProperty
public int maxRowsPerFrame = 5000;
public Period connectionIdleTimeout = DEFAULT_CONNECTION_IDLE_TIMEOUT;
@JsonProperty
public int minRowsPerFrame = DEFAULT_MIN_ROWS_PER_FRAME;
@JsonProperty
public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;
public int getMaxConnections()
{
@ -55,4 +65,16 @@ class AvaticaServerConfig
{
return maxRowsPerFrame;
}
public int getMinRowsPerFrame()
{
Preconditions.checkArgument(
minRowsPerFrame > 0,
"'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0"
);
if (maxRowsPerFrame > 0) {
return Math.min(getMaxRowsPerFrame(), minRowsPerFrame);
}
return minRowsPerFrame;
}
}

View File

@ -627,17 +627,40 @@ public class DruidMeta extends MetaImpl
}
}
/**
* Determine JDBC 'frame' size, that is the number of results which will be returned to a single
* {@link java.sql.ResultSet}. This value corresponds to {@link java.sql.Statement#setFetchSize(int)} (which is a user
* hint, we don't have to honor it), and this method modifies it, ensuring the actual chosen value falls within
* {@link AvaticaServerConfig#minRowsPerFrame} and {@link AvaticaServerConfig#maxRowsPerFrame}.
*
* A value of -1 supplied as input indicates that the client has no preference for fetch size, and can handle
* unlimited results (at our discretion). Similarly, a value of -1 for {@link AvaticaServerConfig#maxRowsPerFrame}
* also indicates that there is no upper limit on fetch size on the server side.
*
* {@link AvaticaServerConfig#minRowsPerFrame} must be configured to a value greater than 0, because it will be
* checked against if any additional frames are required (which means one of the input or maximum was set to a value
* other than -1).
*/
private int getEffectiveMaxRowsPerFrame(int clientMaxRowsPerFrame)
{
// no configured row limit, use the client provided limit
if (config.getMaxRowsPerFrame() < 0) {
return clientMaxRowsPerFrame;
return adjustForMinumumRowsPerFrame(clientMaxRowsPerFrame);
}
// client provided no row limit, use the configured row limit
if (clientMaxRowsPerFrame < 0) {
return config.getMaxRowsPerFrame();
return adjustForMinumumRowsPerFrame(config.getMaxRowsPerFrame());
}
return Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame());
return adjustForMinumumRowsPerFrame(Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame()));
}
/**
* coerce fetch size to be, at minimum, {@link AvaticaServerConfig#minRowsPerFrame}
*/
private int adjustForMinumumRowsPerFrame(int rowsPerFrame)
{
final int adjustedRowsPerFrame = Math.max(config.getMinRowsPerFrame(), rowsPerFrame);
return adjustedRowsPerFrame;
}
private static String withEscapeClause(String toEscape)

View File

@ -24,6 +24,8 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.jackson.JacksonModule;
@ -35,11 +37,14 @@ import org.easymock.Mock;
import org.eclipse.jetty.server.Handler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
import java.util.Set;
@RunWith(EasyMockRunner.class)
@ -52,6 +57,9 @@ public class AvaticaModuleTest
@Mock
private DruidMeta druidMeta;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private AvaticaModule target;
private Injector injector;
@ -95,6 +103,76 @@ public class AvaticaModuleTest
{
AvaticaServerConfig config = injector.getInstance(AvaticaServerConfig.class);
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MIN_ROWS_PER_FRAME, config.getMinRowsPerFrame());
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_ROWS_PER_FRAME, config.getMaxRowsPerFrame());
}
@Test
public void testAvaticaServerConfigProperties()
{
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50000");
properties.setProperty("druid.sql.avatica.minRowsPerFrame", "10000");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(10_000, config.getMinRowsPerFrame());
Assert.assertEquals(50_000, config.getMaxRowsPerFrame());
}
@Test
public void testAvaticaServerConfigPropertiesSmallerMaxIsAlsoMin()
{
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(50, config.getMinRowsPerFrame());
Assert.assertEquals(50, config.getMaxRowsPerFrame());
}
@Test
public void testAvaticaServerConfigPropertiesBadMinRowsPerFrame()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0");
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.minRowsPerFrame", "-1");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
config.getMinRowsPerFrame();
}
@Test

View File

@ -918,6 +918,107 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
);
}
@Test
public void testMinRowsPerFrame() throws Exception
{
final int minFetchSize = 1000;
final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
return 2;
}
@Override
public int getMaxStatementsPerConnection()
{
return 4;
}
@Override
public int getMinRowsPerFrame()
{
return minFetchSize;
}
};
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
)
),
smallFrameConfig,
injector
)
{
@Override
public Frame fetch(
final StatementHandle statement,
final long offset,
final int fetchMaxRowCount
) throws NoSuchStatementException, MissingResultsException
{
// overriding fetch allows us to track how many frames are processed after the first frame, and also fetch size
Assert.assertEquals(minFetchSize, fetchMaxRowCount);
Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
frames.add(frame);
return frame;
}
};
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
smallFrameDruidMeta,
new DruidNode("dummy", "dummy", false, 1, null, true, false),
new AvaticaMonitor()
);
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
smallFrameServer.setHandler(handler);
smallFrameServer.start();
String smallFrameUrl = StringUtils.format(
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
port,
DruidAvaticaHandler.AVATICA_PATH
);
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
// use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement
PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo");
// set a fetch size below the minimum configured threshold
statement.setFetchSize(2);
final ResultSet resultSet = statement.executeQuery();
List<Map<String, Object>> rows = getRows(resultSet);
// expect minimum threshold to be used, which should be enough to do this all in first fetch
Assert.assertEquals(0, frames.size());
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("dim1", ""),
ImmutableMap.of("dim1", "10.1"),
ImmutableMap.of("dim1", "2"),
ImmutableMap.of("dim1", "1"),
ImmutableMap.of("dim1", "def"),
ImmutableMap.of("dim1", "abc")
),
rows
);
}
@Test
@SuppressWarnings("unchecked")
public void testSqlRequestLog() throws Exception