SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression

This commit is contained in:
Joel Bernstein 2019-04-05 08:52:02 -04:00
parent aadc94a6e3
commit 46131e9cda
2 changed files with 23 additions and 6 deletions

View File

@ -138,6 +138,7 @@ public class JDBCStream extends TupleStream implements Expressible {
private String connectionUrl; private String connectionUrl;
private String sqlQuery; private String sqlQuery;
private StreamComparator definedSort; private StreamComparator definedSort;
private int fetchSize;
// Internal // Internal
private Connection connection; private Connection connection;
@ -153,7 +154,7 @@ public class JDBCStream extends TupleStream implements Expressible {
} }
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException { public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException {
init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName); init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName, 5000);
} }
public JDBCStream(StreamExpression expression, StreamFactory factory) throws IOException{ public JDBCStream(StreamExpression expression, StreamFactory factory) throws IOException{
@ -163,7 +164,9 @@ public class JDBCStream extends TupleStream implements Expressible {
StreamExpressionNamedParameter sqlQueryExpression = factory.getNamedOperand(expression, "sql"); StreamExpressionNamedParameter sqlQueryExpression = factory.getNamedOperand(expression, "sql");
StreamExpressionNamedParameter definedSortExpression = factory.getNamedOperand(expression, SORT); StreamExpressionNamedParameter definedSortExpression = factory.getNamedOperand(expression, SORT);
StreamExpressionNamedParameter driverClassNameExpression = factory.getNamedOperand(expression, "driver"); StreamExpressionNamedParameter driverClassNameExpression = factory.getNamedOperand(expression, "driver");
StreamExpressionNamedParameter fetchSizeExpression = factory.getNamedOperand(expression, "fetchSize");
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
if(expression.getParameters().size() != namedParams.size()){ if(expression.getParameters().size() != namedParams.size()){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression)); throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression));
@ -177,6 +180,12 @@ public class JDBCStream extends TupleStream implements Expressible {
} }
} }
int fetchSize = 5000;
if(null != fetchSizeExpression && fetchSizeExpression.getParameter() instanceof StreamExpressionValue){
String fetchSizeString = ((StreamExpressionValue)fetchSizeExpression.getParameter()).getValue();
fetchSize = Integer.parseInt(fetchSizeString);
}
// connectionUrl, required // connectionUrl, required
String connectionUrl = null; String connectionUrl = null;
if(null != connectionUrlExpression && connectionUrlExpression.getParameter() instanceof StreamExpressionValue){ if(null != connectionUrlExpression && connectionUrlExpression.getParameter() instanceof StreamExpressionValue){
@ -211,15 +220,16 @@ public class JDBCStream extends TupleStream implements Expressible {
} }
// We've got all the required items // We've got all the required items
init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass); init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass, fetchSize);
} }
private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) { private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName, int fetchSize) {
this.connectionUrl = connectionUrl; this.connectionUrl = connectionUrl;
this.sqlQuery = sqlQuery; this.sqlQuery = sqlQuery;
this.definedSort = definedSort; this.definedSort = definedSort;
this.connectionProperties = connectionProperties; this.connectionProperties = connectionProperties;
this.driverClassName = driverClassName; this.driverClassName = driverClassName;
this.fetchSize = fetchSize;
} }
public void setStreamContext(StreamContext context) { public void setStreamContext(StreamContext context) {
@ -267,6 +277,7 @@ public class JDBCStream extends TupleStream implements Expressible {
try{ try{
resultSet = statement.executeQuery(sqlQuery); resultSet = statement.executeQuery(sqlQuery);
resultSet.setFetchSize(fetchSize);
} catch (SQLException e) { } catch (SQLException e) {
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n" throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n"
+ e.getMessage(), sqlQuery, connectionUrl), e); + e.getMessage(), sqlQuery, connectionUrl), e);
@ -531,7 +542,10 @@ public class JDBCStream extends TupleStream implements Expressible {
// sql // sql
expression.addParameter(new StreamExpressionNamedParameter("sql", sqlQuery)); expression.addParameter(new StreamExpressionNamedParameter("sql", sqlQuery));
// fetchSize
expression.addParameter(new StreamExpressionNamedParameter("fetchSize", Integer.toString(fetchSize)));
// sort // sort
expression.addParameter(new StreamExpressionNamedParameter(SORT, definedSort.toExpression(factory))); expression.addParameter(new StreamExpressionNamedParameter(SORT, definedSort.toExpression(factory)));

View File

@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@ -329,7 +330,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
+ " rating_f as rating" + " rating_f as rating"
+ " )," + " ),"
+ " select(" + " select("
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\")," + " jdbc(fetchSize=300, connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ " ID as personId," + " ID as personId,"
+ " NAME as personName," + " NAME as personName,"
+ " COUNTRY_NAME as country" + " COUNTRY_NAME as country"
@ -339,6 +340,8 @@ public class JDBCStreamTest extends SolrCloudTestCase {
stream = factory.constructStream(expression); stream = factory.constructStream(expression);
String expr = ((Expressible)stream).toExpression(factory).toString();
assertTrue(expr.contains("fetchSize=300"));
stream.setStreamContext(streamContext); stream.setStreamContext(streamContext);
tuples = getTuples(stream); tuples = getTuples(stream);