From 46131e9cdaf3fc377e307342d793846c4f36186a Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Fri, 5 Apr 2019 08:52:02 -0400 Subject: [PATCH] SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression --- .../client/solrj/io/stream/JDBCStream.java | 24 +++++++++++++++---- .../solrj/io/stream/JDBCStreamTest.java | 5 +++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java index 40810353daa..35d23ebb98b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java @@ -138,6 +138,7 @@ public class JDBCStream extends TupleStream implements Expressible { private String connectionUrl; private String sqlQuery; private StreamComparator definedSort; + private int fetchSize; // Internal private Connection connection; @@ -153,7 +154,7 @@ public class JDBCStream extends TupleStream implements Expressible { } public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException { - init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName); + init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName, 5000); } public JDBCStream(StreamExpression expression, StreamFactory factory) throws IOException{ @@ -163,7 +164,9 @@ public class JDBCStream extends TupleStream implements Expressible { StreamExpressionNamedParameter sqlQueryExpression = factory.getNamedOperand(expression, "sql"); StreamExpressionNamedParameter definedSortExpression = factory.getNamedOperand(expression, SORT); StreamExpressionNamedParameter driverClassNameExpression = factory.getNamedOperand(expression, "driver"); - + StreamExpressionNamedParameter fetchSizeExpression = factory.getNamedOperand(expression, "fetchSize"); + + // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice if(expression.getParameters().size() != namedParams.size()){ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression)); @@ -177,6 +180,12 @@ public class JDBCStream extends TupleStream implements Expressible { } } + int fetchSize = 5000; + if(null != fetchSizeExpression && fetchSizeExpression.getParameter() instanceof StreamExpressionValue){ + String fetchSizeString = ((StreamExpressionValue)fetchSizeExpression.getParameter()).getValue(); + fetchSize = Integer.parseInt(fetchSizeString); + } + // connectionUrl, required String connectionUrl = null; if(null != connectionUrlExpression && connectionUrlExpression.getParameter() instanceof StreamExpressionValue){ @@ -211,15 +220,16 @@ public class JDBCStream extends TupleStream implements Expressible { } // We've got all the required items - init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass); + init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass, fetchSize); } - private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) { + private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName, int fetchSize) { this.connectionUrl = connectionUrl; this.sqlQuery = sqlQuery; this.definedSort = definedSort; this.connectionProperties = connectionProperties; this.driverClassName = driverClassName; + this.fetchSize = fetchSize; } public void setStreamContext(StreamContext context) { @@ -267,6 +277,7 @@ public class JDBCStream extends TupleStream implements Expressible { try{ resultSet = statement.executeQuery(sqlQuery); + resultSet.setFetchSize(fetchSize); } catch (SQLException e) { throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n" + e.getMessage(), sqlQuery, connectionUrl), e); @@ -531,7 +542,10 @@ public class JDBCStream extends TupleStream implements Expressible { // sql expression.addParameter(new StreamExpressionNamedParameter("sql", sqlQuery)); - + + // fetchSize + expression.addParameter(new StreamExpressionNamedParameter("fetchSize", Integer.toString(fetchSize))); + // sort expression.addParameter(new StreamExpressionNamedParameter(SORT, definedSort.toExpression(factory))); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java index a44de5b7628..d6ac88d1560 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java @@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; @@ -329,7 +330,7 @@ public class JDBCStreamTest extends SolrCloudTestCase { + " rating_f as rating" + " )," + " select(" - + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\")," + + " jdbc(fetchSize=300, connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\")," + " ID as personId," + " NAME as personName," + " COUNTRY_NAME as country" @@ -339,6 +340,8 @@ public class JDBCStreamTest extends SolrCloudTestCase { stream = factory.constructStream(expression); + String expr = ((Expressible)stream).toExpression(factory).toString(); + assertTrue(expr.contains("fetchSize=300")); stream.setStreamContext(streamContext); tuples = getTuples(stream);