SOLR-8612: closing JDBC Statement on exceptions from JdbcDataSource in DataImportHandler aka DIH (Kristine Jetzke via Mikhail Khludnev)

This commit is contained in:
Mikhail Khludnev 2016-06-02 22:53:15 +03:00
parent c8570ed821
commit 24fa92959d
3 changed files with 257 additions and 37 deletions

View File

@ -249,6 +249,8 @@ Bug Fixes
* SOLR-8940: Fix group.sort option (hossman)
* SOLR-8612: closing JDBC Statement on failures in DataImportHandler (DIH) (Kristine Jetzke via Mikhail Khludnev)
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -29,14 +29,12 @@ import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.invoke.MethodHandles;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.concurrent.Callable;
@ -61,6 +59,8 @@ public class JdbcDataSource extends
private Connection conn;
private ResultSetIterator resultSetIterator;
private Map<String, Integer> fieldNameVsType = new HashMap<>();
private boolean convertType = false;
@ -276,15 +276,19 @@ public class JdbcDataSource extends
@Override
public Iterator<Map<String, Object>> getData(String query) {
ResultSetIterator r = new ResultSetIterator(query);
return r.getIterator();
if (resultSetIterator != null) {
resultSetIterator.close();
resultSetIterator = null;
}
resultSetIterator = new ResultSetIterator(query);
return resultSetIterator.getIterator();
}
private void logError(String msg, Exception e) {
LOG.warn(msg, e);
}
private List<String> readFieldNames(ResultSetMetaData metaData)
protected List<String> readFieldNames(ResultSetMetaData metaData)
throws SQLException {
List<String> colNames = new ArrayList<>();
int count = metaData.getColumnCount();
@ -299,35 +303,38 @@ public class JdbcDataSource extends
private Statement stmt = null;
private List<String> colNames;
private Iterator<Map<String, Object>> rSetIterator;
public ResultSetIterator(String query) {
final List<String> colNames;
try {
Connection c = getConnection();
stmt = createStatement(c);
stmt = createStatement(c, batchSize, maxRows);
LOG.debug("Executing SQL: " + query);
long start = System.nanoTime();
resultSet = executeStatement(stmt, query);
LOG.trace("Time taken for sql :"
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
colNames = readFieldNames(resultSet.getMetaData());
setColNames(resultSet);
} catch (Exception e) {
close();
wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
return;
}
if (resultSet == null) {
close();
rSetIterator = new ArrayList<Map<String, Object>>().iterator();
return;
}
rSetIterator = createIterator(stmt, resultSet, convertType, colNames, fieldNameVsType);
rSetIterator = createIterator(convertType, fieldNameVsType);
}
protected Statement createStatement(Connection c) throws SQLException {
protected Statement createStatement(final Connection c, final int batchSize, final int maxRows)
throws SQLException {
Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(batchSize);
statement.setMaxRows(maxRows);
@ -341,18 +348,25 @@ public class JdbcDataSource extends
return null;
}
protected void setColNames(final ResultSet resultSet) throws SQLException {
if (resultSet != null) {
colNames = readFieldNames(resultSet.getMetaData());
} else {
colNames = Collections.emptyList();
}
}
protected Iterator<Map<String,Object>> createIterator(Statement stmt, ResultSet resultSet, boolean convertType,
List<String> colNames, Map<String,Integer> fieldNameVsType) {
protected Iterator<Map<String,Object>> createIterator(final boolean convertType,
final Map<String,Integer> fieldNameVsType) {
return new Iterator<Map<String,Object>>() {
@Override
public boolean hasNext() {
return hasnext(resultSet, stmt);
return hasnext();
}
@Override
public Map<String,Object> next() {
return getARow(resultSet, convertType, colNames, fieldNameVsType);
return getARow(convertType, fieldNameVsType);
}
@Override
@ -363,17 +377,16 @@ public class JdbcDataSource extends
protected Map<String,Object> getARow(ResultSet resultSet, boolean convertType, List<String> colNames,
Map<String,Integer> fieldNameVsType) {
if (resultSet == null)
protected Map<String,Object> getARow(boolean convertType, Map<String,Integer> fieldNameVsType) {
if (getResultSet() == null)
return null;
Map<String, Object> result = new HashMap<>();
for (String colName : colNames) {
for (String colName : getColNames()) {
try {
if (!convertType) {
// Use underlying database's type information except for BigDecimal and BigInteger
// which cannot be serialized by JavaBin/XML. See SOLR-6165
Object value = resultSet.getObject(colName);
Object value = getResultSet().getObject(colName);
if (value instanceof BigDecimal || value instanceof BigInteger) {
result.put(colName, value.toString());
} else {
@ -387,28 +400,28 @@ public class JdbcDataSource extends
type = Types.VARCHAR;
switch (type) {
case Types.INTEGER:
result.put(colName, resultSet.getInt(colName));
result.put(colName, getResultSet().getInt(colName));
break;
case Types.FLOAT:
result.put(colName, resultSet.getFloat(colName));
result.put(colName, getResultSet().getFloat(colName));
break;
case Types.BIGINT:
result.put(colName, resultSet.getLong(colName));
result.put(colName, getResultSet().getLong(colName));
break;
case Types.DOUBLE:
result.put(colName, resultSet.getDouble(colName));
result.put(colName, getResultSet().getDouble(colName));
break;
case Types.DATE:
result.put(colName, resultSet.getTimestamp(colName));
result.put(colName, getResultSet().getTimestamp(colName));
break;
case Types.BOOLEAN:
result.put(colName, resultSet.getBoolean(colName));
result.put(colName, getResultSet().getBoolean(colName));
break;
case Types.BLOB:
result.put(colName, resultSet.getBytes(colName));
result.put(colName, getResultSet().getBytes(colName));
break;
default:
result.put(colName, resultSet.getString(colName));
result.put(colName, getResultSet().getString(colName));
break;
}
} catch (SQLException e) {
@ -419,11 +432,13 @@ public class JdbcDataSource extends
return result;
}
protected boolean hasnext(ResultSet resultSet, Statement stmt) {
if (resultSet == null)
protected boolean hasnext() {
if (getResultSet() == null) {
close();
return false;
}
try {
if (resultSet.next()) {
if (getResultSet().next()) {
return true;
} else {
close();
@ -438,15 +453,15 @@ public class JdbcDataSource extends
protected void close() {
try {
if (resultSet != null)
resultSet.close();
if (stmt != null)
stmt.close();
if (getResultSet() != null)
getResultSet().close();
if (getStatement() != null)
getStatement().close();
} catch (Exception e) {
logError("Exception while closing result set", e);
} finally {
resultSet = null;
stmt = null;
setResultSet(null);
setStatement(null);
}
}
@ -454,6 +469,31 @@ public class JdbcDataSource extends
return rSetIterator;
}
protected final Statement getStatement() {
return stmt;
}
protected final void setStatement(Statement stmt) {
this.stmt = stmt;
}
protected final ResultSet getResultSet() {
return resultSet;
}
protected final void setResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
}
protected final List<String> getColNames() {
return colNames;
}
protected final void setColNames(List<String> colNames) {
this.colNames = colNames;
}
}
protected Connection getConnection() throws Exception {
@ -488,6 +528,9 @@ public class JdbcDataSource extends
@Override
public void close() {
if (resultSetIterator != null) {
resultSetIterator.close();
}
try {
closeConnection();
} finally {

View File

@ -22,11 +22,15 @@ import java.nio.file.Files;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import javax.sql.DataSource;
import org.apache.solr.handler.dataimport.JdbcDataSource.ResultSetIterator;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.After;
@ -201,6 +205,177 @@ public class TestJdbcDataSource extends AbstractDataImportHandlerTestCase {
mockControl.verify();
}
@Test
public void testClosesStatementWhenExceptionThrownOnExecuteQuery() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
SQLException sqlException = new SQLException("fake");
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andThrow(sqlException);
statement.close();
mockControl.replay();
try {
jdbcDataSource.getData("query");
fail("exception expected");
} catch (DataImportHandlerException ex) {
assertSame(sqlException, ex.getCause());
}
mockControl.verify();
}
@Test
public void testClosesStatementWhenResultSetNull() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(false);
statement.close();
mockControl.replay();
jdbcDataSource.getData("query");
mockControl.verify();
}
@Test
public void testClosesStatementWhenHasNextCalledAndResultSetNull() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
statement.close();
mockControl.replay();
Iterator<Map<String,Object>> data = jdbcDataSource.getData("query");
ResultSetIterator resultSetIterator = (ResultSetIterator) data.getClass().getDeclaredField("this$1").get(data);
resultSetIterator.setResultSet(null);
data.hasNext();
mockControl.verify();
}
@Test
public void testClosesResultSetAndStatementWhenDataSourceIsClosed() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
resultSet.close();
statement.close();
connection.commit();
connection.close();
mockControl.replay();
jdbcDataSource.getData("query");
jdbcDataSource.close();
mockControl.verify();
}
@Test
public void testClosesCurrentResultSetIteratorWhenNewOneIsCreated() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
resultSet.close();
statement.close();
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("other query")).andReturn(false);
statement.close();
mockControl.replay();
jdbcDataSource.getData("query");
jdbcDataSource.getData("other query");
mockControl.verify();
}
@Test
public void testRetrieveFromDriverManager() throws Exception {
DriverManager.registerDriver(driver);