mirror of https://github.com/apache/lucene.git
SOLR-9729: JDBCStream improvements
This commit is contained in:
parent
ace423e958
commit
c20d1298d3
|
@ -162,6 +162,8 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-9284: The HDFS BlockDirectoryCache should not let it's keysToRelease or names maps grow indefinitely.
|
* SOLR-9284: The HDFS BlockDirectoryCache should not let it's keysToRelease or names maps grow indefinitely.
|
||||||
(Mark Miller, Michael Sun)
|
(Mark Miller, Michael Sun)
|
||||||
|
|
||||||
|
* SOLR-9729: JDBCStream improvements (Kevin Risden)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
----------------------
|
----------------------
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
// These are java types that we can directly support as an Object instance. Other supported
|
// These are java types that we can directly support as an Object instance. Other supported
|
||||||
// types will require some level of conversion (short -> long, etc...)
|
// types will require some level of conversion (short -> long, etc...)
|
||||||
// We'll use a static constructor to load this set.
|
// We'll use a static constructor to load this set.
|
||||||
private static HashSet<String> directSupportedTypes = new HashSet<String>();
|
private static final HashSet<String> directSupportedTypes = new HashSet<>();
|
||||||
static {
|
static {
|
||||||
directSupportedTypes.add(String.class.getName());
|
directSupportedTypes.add(String.class.getName());
|
||||||
directSupportedTypes.add(Double.class.getName());
|
directSupportedTypes.add(Double.class.getName());
|
||||||
|
@ -85,8 +85,8 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private Properties connectionProperties;
|
private Properties connectionProperties;
|
||||||
private Statement statement;
|
private Statement statement;
|
||||||
private ResultSet resultSet;
|
|
||||||
private ResultSetValueSelector[] valueSelectors;
|
private ResultSetValueSelector[] valueSelectors;
|
||||||
|
protected ResultSet resultSet;
|
||||||
protected transient StreamContext streamContext;
|
protected transient StreamContext streamContext;
|
||||||
|
|
||||||
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort) throws IOException {
|
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort) throws IOException {
|
||||||
|
@ -107,7 +107,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
|
|
||||||
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
|
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
|
||||||
if(expression.getParameters().size() != namedParams.size()){
|
if(expression.getParameters().size() != namedParams.size()){
|
||||||
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression));
|
||||||
}
|
}
|
||||||
|
|
||||||
// All named params we don't care about will be passed to the driver on connection
|
// All named params we don't care about will be passed to the driver on connection
|
||||||
|
@ -124,7 +124,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
connectionUrl = ((StreamExpressionValue)connectionUrlExpression.getParameter()).getValue();
|
connectionUrl = ((StreamExpressionValue)connectionUrlExpression.getParameter()).getValue();
|
||||||
}
|
}
|
||||||
if(null == connectionUrl){
|
if(null == connectionUrl){
|
||||||
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - connection not found"));
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - connection not found", connectionUrlExpression));
|
||||||
}
|
}
|
||||||
|
|
||||||
// sql, required
|
// sql, required
|
||||||
|
@ -133,16 +133,16 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
sqlQuery = ((StreamExpressionValue)sqlQueryExpression.getParameter()).getValue();
|
sqlQuery = ((StreamExpressionValue)sqlQueryExpression.getParameter()).getValue();
|
||||||
}
|
}
|
||||||
if(null == sqlQuery){
|
if(null == sqlQuery){
|
||||||
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - sql not found"));
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - sql not found", sqlQueryExpression));
|
||||||
}
|
}
|
||||||
|
|
||||||
// definedSort, required
|
// definedSort, required
|
||||||
StreamComparator definedSort = null;
|
StreamComparator definedSort = null;
|
||||||
if(null != sqlQueryExpression && sqlQueryExpression.getParameter() instanceof StreamExpressionValue){
|
if(null != definedSortExpression && definedSortExpression.getParameter() instanceof StreamExpressionValue){
|
||||||
definedSort = factory.constructComparator(((StreamExpressionValue)definedSortExpression.getParameter()).getValue(), FieldComparator.class);
|
definedSort = factory.constructComparator(((StreamExpressionValue)definedSortExpression.getParameter()).getValue(), FieldComparator.class);
|
||||||
}
|
}
|
||||||
if(null == definedSort){
|
if(null == definedSort){
|
||||||
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - sort not found"));
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - sort not found", definedSortExpression));
|
||||||
}
|
}
|
||||||
|
|
||||||
// driverClass, optional
|
// driverClass, optional
|
||||||
|
@ -155,7 +155,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass);
|
init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException {
|
private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) {
|
||||||
this.connectionUrl = connectionUrl;
|
this.connectionUrl = connectionUrl;
|
||||||
this.sqlQuery = sqlQuery;
|
this.sqlQuery = sqlQuery;
|
||||||
this.definedSort = definedSort;
|
this.definedSort = definedSort;
|
||||||
|
@ -188,7 +188,9 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
throw new SQLException("DriverManager.getDriver(url) returned null");
|
throw new SQLException("DriverManager.getDriver(url) returned null");
|
||||||
}
|
}
|
||||||
} catch(SQLException e){
|
} catch(SQLException e){
|
||||||
throw new IOException(String.format(Locale.ROOT, "Failed to determine JDBC driver from connection url '%s'. Usually this means the driver is not loaded - you can have JDBCStream try to load it by providing the 'driverClassName' value", connectionUrl), e);
|
throw new IOException(String.format(Locale.ROOT,
|
||||||
|
"Failed to determine JDBC driver from connection url '%s'. Usually this means the driver is not loaded - " +
|
||||||
|
"you can have JDBCStream try to load it by providing the 'driverClassName' value", connectionUrl), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -200,20 +202,23 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
try{
|
try{
|
||||||
statement = connection.createStatement();
|
statement = connection.createStatement();
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw new IOException(String.format(Locale.ROOT, "Failed to create a statement from JDBC connection '%s'", connectionUrl), e);
|
throw new IOException(String.format(Locale.ROOT, "Failed to create a statement from JDBC connection '%s'",
|
||||||
|
connectionUrl), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try{
|
try{
|
||||||
resultSet = statement.executeQuery(sqlQuery);
|
resultSet = statement.executeQuery(sqlQuery);
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'", sqlQuery, connectionUrl), e);
|
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'",
|
||||||
|
sqlQuery, connectionUrl), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try{
|
try{
|
||||||
// using the metadata, build selectors for each column
|
// using the metadata, build selectors for each column
|
||||||
valueSelectors = constructValueSelectors(resultSet.getMetaData());
|
valueSelectors = constructValueSelectors(resultSet.getMetaData());
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw new IOException(String.format(Locale.ROOT, "Failed to generate value selectors for sqlQuery '%s' against JDBC connection '%s'", sqlQuery, connectionUrl), e);
|
throw new IOException(String.format(Locale.ROOT,
|
||||||
|
"Failed to generate value selectors for sqlQuery '%s' against JDBC connection '%s'", sqlQuery, connectionUrl), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,8 +227,9 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
|
|
||||||
for(int columnIdx = 0; columnIdx < metadata.getColumnCount(); ++columnIdx){
|
for(int columnIdx = 0; columnIdx < metadata.getColumnCount(); ++columnIdx){
|
||||||
|
|
||||||
final int columnNumber = columnIdx + 1; // cause it starts at 1
|
final int columnNumber = columnIdx + 1; // cause it starts at 1
|
||||||
final String columnName = metadata.getColumnName(columnNumber);
|
// Use getColumnLabel instead of getColumnName to make sure fields renamed with AS as picked up properly
|
||||||
|
final String columnName = metadata.getColumnLabel(columnNumber);
|
||||||
String className = metadata.getColumnClassName(columnNumber);
|
String className = metadata.getColumnClassName(columnNumber);
|
||||||
String typeName = metadata.getColumnTypeName(columnNumber);
|
String typeName = metadata.getColumnTypeName(columnNumber);
|
||||||
|
|
||||||
|
@ -238,8 +244,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
return columnName;
|
return columnName;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
} else if(Short.class.getName().equals(className)) {
|
||||||
else if(Short.class.getName() == className){
|
|
||||||
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
||||||
public Object selectValue(ResultSet resultSet) throws SQLException {
|
public Object selectValue(ResultSet resultSet) throws SQLException {
|
||||||
Short obj = resultSet.getShort(columnNumber);
|
Short obj = resultSet.getShort(columnNumber);
|
||||||
|
@ -250,8 +255,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
return columnName;
|
return columnName;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
} else if(Integer.class.getName().equals(className)) {
|
||||||
else if(Integer.class.getName() == className){
|
|
||||||
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
||||||
public Object selectValue(ResultSet resultSet) throws SQLException {
|
public Object selectValue(ResultSet resultSet) throws SQLException {
|
||||||
Integer obj = resultSet.getInt(columnNumber);
|
Integer obj = resultSet.getInt(columnNumber);
|
||||||
|
@ -262,8 +266,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
return columnName;
|
return columnName;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
} else if(Float.class.getName().equals(className)) {
|
||||||
else if(Float.class.getName() == className){
|
|
||||||
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
valueSelectors[columnIdx] = new ResultSetValueSelector() {
|
||||||
public Object selectValue(ResultSet resultSet) throws SQLException {
|
public Object selectValue(ResultSet resultSet) throws SQLException {
|
||||||
Float obj = resultSet.getFloat(columnNumber);
|
Float obj = resultSet.getFloat(columnNumber);
|
||||||
|
@ -274,9 +277,10 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
return columnName;
|
return columnName;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
} else {
|
||||||
else{
|
throw new SQLException(String.format(Locale.ROOT,
|
||||||
throw new SQLException(String.format(Locale.ROOT, "Unable to determine the valueSelector for column '%s' (col #%d) of java class '%s' and type '%s'", columnName, columnNumber, className, typeName));
|
"Unable to determine the valueSelector for column '%s' (col #%d) of java class '%s' and type '%s'",
|
||||||
|
columnName, columnNumber, className, typeName));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,7 +309,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
public Tuple read() throws IOException {
|
public Tuple read() throws IOException {
|
||||||
|
|
||||||
try{
|
try{
|
||||||
Map<Object,Object> fields = new HashMap<Object,Object>();
|
Map<Object,Object> fields = new HashMap<>();
|
||||||
if(resultSet.next()){
|
if(resultSet.next()){
|
||||||
// we have a record
|
// we have a record
|
||||||
for(ResultSetValueSelector selector : valueSelectors){
|
for(ResultSetValueSelector selector : valueSelectors){
|
||||||
|
@ -391,7 +395,7 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TupleStream> children() {
|
public List<TupleStream> children() {
|
||||||
return new ArrayList<TupleStream>();
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -404,6 +408,6 @@ public class JDBCStream extends TupleStream implements Expressible {
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ResultSetValueSelector {
|
interface ResultSetValueSelector {
|
||||||
public String getColumnName();
|
String getColumnName();
|
||||||
public Object selectValue(ResultSet resultSet) throws SQLException;
|
Object selectValue(ResultSet resultSet) throws SQLException;
|
||||||
}
|
}
|
|
@ -351,11 +351,8 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
TupleStream stream;
|
TupleStream stream;
|
||||||
List<Tuple> tuples;
|
List<Tuple> tuples;
|
||||||
|
|
||||||
// Basic test
|
// Basic test for no alias
|
||||||
// the test here is the setting of the property get_column_name=true. In hsqldb if this value is set to true then the use of an
|
expression =
|
||||||
// as clause in a select will have no effect. As such even though we have PEOPLE.ID as PERSONID we will still expect the column
|
|
||||||
// name to come out as ID and not PERSONID
|
|
||||||
expression =
|
|
||||||
"innerJoin("
|
"innerJoin("
|
||||||
+ " select("
|
+ " select("
|
||||||
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
+ " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
|
||||||
|
@ -363,7 +360,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
+ " rating_f as rating"
|
+ " rating_f as rating"
|
||||||
+ " ),"
|
+ " ),"
|
||||||
+ " select("
|
+ " select("
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\", get_column_name=true),"
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
|
||||||
+ " ID as personId,"
|
+ " ID as personId,"
|
||||||
+ " NAME as personName,"
|
+ " NAME as personName,"
|
||||||
+ " COUNTRY_NAME as country"
|
+ " COUNTRY_NAME as country"
|
||||||
|
@ -380,10 +377,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
|
assertOrderOf(tuples, "personName", "Emma","Grace","Hailey","Isabella","Lily","Madison","Mia","Natalie","Olivia","Samantha");
|
||||||
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
|
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
|
||||||
|
|
||||||
// Basic test
|
// Basic test for alias
|
||||||
// the test here is the setting of the property get_column_name=false. In hsqldb if this value is set to false then the use of an
|
|
||||||
// as clause in a select will have effect. As such we have PEOPLE.ID as PERSONID we will still expect the column name to come out
|
|
||||||
// PERSONID and not ID
|
|
||||||
expression =
|
expression =
|
||||||
"innerJoin("
|
"innerJoin("
|
||||||
+ " select("
|
+ " select("
|
||||||
|
@ -392,7 +386,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
|
||||||
+ " rating_f as rating"
|
+ " rating_f as rating"
|
||||||
+ " ),"
|
+ " ),"
|
||||||
+ " select("
|
+ " select("
|
||||||
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\", get_column_name=false),"
|
+ " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID as PERSONID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"PERSONID asc\"),"
|
||||||
+ " PERSONID as personId,"
|
+ " PERSONID as personId,"
|
||||||
+ " NAME as personName,"
|
+ " NAME as personName,"
|
||||||
+ " COUNTRY_NAME as country"
|
+ " COUNTRY_NAME as country"
|
||||||
|
|
Loading…
Reference in New Issue