SOLR-8612: closing JDBC Statement on exceptions from JdbcDataSource in DataImportHandler aka DIH (Kristine Jetzke via Mikhail Khludnev)

This commit is contained in:
Mikhail Khludnev 2016-06-02 22:53:15 +03:00
parent 5dfaf0392f
commit 22e5d31cdc
3 changed files with 257 additions and 37 deletions

View File

@ -218,6 +218,8 @@ Bug Fixes
* SOLR-8940: Fix group.sort option (hossman) * SOLR-8940: Fix group.sort option (hossman)
* SOLR-8612: closing JDBC Statement on failures in DataImportHandler (DIH) (Kristine Jetzke via Mikhail Khludnev)
Optimizations Optimizations
---------------------- ----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation. * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -29,14 +29,12 @@ import javax.naming.InitialContext;
import javax.naming.NamingException; import javax.naming.NamingException;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.Reader; import java.io.Reader;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -60,6 +58,8 @@ public class JdbcDataSource extends
private long connLastUsed = 0; private long connLastUsed = 0;
private Connection conn; private Connection conn;
private ResultSetIterator resultSetIterator;
private Map<String, Integer> fieldNameVsType = new HashMap<>(); private Map<String, Integer> fieldNameVsType = new HashMap<>();
@ -276,15 +276,19 @@ public class JdbcDataSource extends
@Override @Override
public Iterator<Map<String, Object>> getData(String query) { public Iterator<Map<String, Object>> getData(String query) {
ResultSetIterator r = new ResultSetIterator(query); if (resultSetIterator != null) {
return r.getIterator(); resultSetIterator.close();
resultSetIterator = null;
}
resultSetIterator = new ResultSetIterator(query);
return resultSetIterator.getIterator();
} }
private void logError(String msg, Exception e) { private void logError(String msg, Exception e) {
LOG.warn(msg, e); LOG.warn(msg, e);
} }
private List<String> readFieldNames(ResultSetMetaData metaData) protected List<String> readFieldNames(ResultSetMetaData metaData)
throws SQLException { throws SQLException {
List<String> colNames = new ArrayList<>(); List<String> colNames = new ArrayList<>();
int count = metaData.getColumnCount(); int count = metaData.getColumnCount();
@ -299,35 +303,38 @@ public class JdbcDataSource extends
private Statement stmt = null; private Statement stmt = null;
private List<String> colNames;
private Iterator<Map<String, Object>> rSetIterator; private Iterator<Map<String, Object>> rSetIterator;
public ResultSetIterator(String query) { public ResultSetIterator(String query) {
final List<String> colNames;
try { try {
Connection c = getConnection(); Connection c = getConnection();
stmt = createStatement(c); stmt = createStatement(c, batchSize, maxRows);
LOG.debug("Executing SQL: " + query); LOG.debug("Executing SQL: " + query);
long start = System.nanoTime(); long start = System.nanoTime();
resultSet = executeStatement(stmt, query); resultSet = executeStatement(stmt, query);
LOG.trace("Time taken for sql :" LOG.trace("Time taken for sql :"
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)); + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
colNames = readFieldNames(resultSet.getMetaData()); setColNames(resultSet);
} catch (Exception e) { } catch (Exception e) {
close();
wrapAndThrow(SEVERE, e, "Unable to execute query: " + query); wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
return; return;
} }
if (resultSet == null) { if (resultSet == null) {
close();
rSetIterator = new ArrayList<Map<String, Object>>().iterator(); rSetIterator = new ArrayList<Map<String, Object>>().iterator();
return; return;
} }
rSetIterator = createIterator(stmt, resultSet, convertType, colNames, fieldNameVsType); rSetIterator = createIterator(convertType, fieldNameVsType);
} }
protected Statement createStatement(Connection c) throws SQLException { protected Statement createStatement(final Connection c, final int batchSize, final int maxRows)
throws SQLException {
Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(batchSize); statement.setFetchSize(batchSize);
statement.setMaxRows(maxRows); statement.setMaxRows(maxRows);
@ -340,19 +347,26 @@ public class JdbcDataSource extends
} }
return null; return null;
} }
protected void setColNames(final ResultSet resultSet) throws SQLException {
if (resultSet != null) {
colNames = readFieldNames(resultSet.getMetaData());
} else {
colNames = Collections.emptyList();
}
}
protected Iterator<Map<String,Object>> createIterator(final boolean convertType,
protected Iterator<Map<String,Object>> createIterator(Statement stmt, ResultSet resultSet, boolean convertType, final Map<String,Integer> fieldNameVsType) {
List<String> colNames, Map<String,Integer> fieldNameVsType) {
return new Iterator<Map<String,Object>>() { return new Iterator<Map<String,Object>>() {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return hasnext(resultSet, stmt); return hasnext();
} }
@Override @Override
public Map<String,Object> next() { public Map<String,Object> next() {
return getARow(resultSet, convertType, colNames, fieldNameVsType); return getARow(convertType, fieldNameVsType);
} }
@Override @Override
@ -363,17 +377,16 @@ public class JdbcDataSource extends
protected Map<String,Object> getARow(ResultSet resultSet, boolean convertType, List<String> colNames, protected Map<String,Object> getARow(boolean convertType, Map<String,Integer> fieldNameVsType) {
Map<String,Integer> fieldNameVsType) { if (getResultSet() == null)
if (resultSet == null)
return null; return null;
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
for (String colName : colNames) { for (String colName : getColNames()) {
try { try {
if (!convertType) { if (!convertType) {
// Use underlying database's type information except for BigDecimal and BigInteger // Use underlying database's type information except for BigDecimal and BigInteger
// which cannot be serialized by JavaBin/XML. See SOLR-6165 // which cannot be serialized by JavaBin/XML. See SOLR-6165
Object value = resultSet.getObject(colName); Object value = getResultSet().getObject(colName);
if (value instanceof BigDecimal || value instanceof BigInteger) { if (value instanceof BigDecimal || value instanceof BigInteger) {
result.put(colName, value.toString()); result.put(colName, value.toString());
} else { } else {
@ -387,28 +400,28 @@ public class JdbcDataSource extends
type = Types.VARCHAR; type = Types.VARCHAR;
switch (type) { switch (type) {
case Types.INTEGER: case Types.INTEGER:
result.put(colName, resultSet.getInt(colName)); result.put(colName, getResultSet().getInt(colName));
break; break;
case Types.FLOAT: case Types.FLOAT:
result.put(colName, resultSet.getFloat(colName)); result.put(colName, getResultSet().getFloat(colName));
break; break;
case Types.BIGINT: case Types.BIGINT:
result.put(colName, resultSet.getLong(colName)); result.put(colName, getResultSet().getLong(colName));
break; break;
case Types.DOUBLE: case Types.DOUBLE:
result.put(colName, resultSet.getDouble(colName)); result.put(colName, getResultSet().getDouble(colName));
break; break;
case Types.DATE: case Types.DATE:
result.put(colName, resultSet.getTimestamp(colName)); result.put(colName, getResultSet().getTimestamp(colName));
break; break;
case Types.BOOLEAN: case Types.BOOLEAN:
result.put(colName, resultSet.getBoolean(colName)); result.put(colName, getResultSet().getBoolean(colName));
break; break;
case Types.BLOB: case Types.BLOB:
result.put(colName, resultSet.getBytes(colName)); result.put(colName, getResultSet().getBytes(colName));
break; break;
default: default:
result.put(colName, resultSet.getString(colName)); result.put(colName, getResultSet().getString(colName));
break; break;
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -419,11 +432,13 @@ public class JdbcDataSource extends
return result; return result;
} }
protected boolean hasnext(ResultSet resultSet, Statement stmt) { protected boolean hasnext() {
if (resultSet == null) if (getResultSet() == null) {
close();
return false; return false;
}
try { try {
if (resultSet.next()) { if (getResultSet().next()) {
return true; return true;
} else { } else {
close(); close();
@ -438,15 +453,15 @@ public class JdbcDataSource extends
protected void close() { protected void close() {
try { try {
if (resultSet != null) if (getResultSet() != null)
resultSet.close(); getResultSet().close();
if (stmt != null) if (getStatement() != null)
stmt.close(); getStatement().close();
} catch (Exception e) { } catch (Exception e) {
logError("Exception while closing result set", e); logError("Exception while closing result set", e);
} finally { } finally {
resultSet = null; setResultSet(null);
stmt = null; setStatement(null);
} }
} }
@ -454,6 +469,31 @@ public class JdbcDataSource extends
return rSetIterator; return rSetIterator;
} }
protected final Statement getStatement() {
return stmt;
}
protected final void setStatement(Statement stmt) {
this.stmt = stmt;
}
protected final ResultSet getResultSet() {
return resultSet;
}
protected final void setResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
}
protected final List<String> getColNames() {
return colNames;
}
protected final void setColNames(List<String> colNames) {
this.colNames = colNames;
}
} }
protected Connection getConnection() throws Exception { protected Connection getConnection() throws Exception {
@ -488,6 +528,9 @@ public class JdbcDataSource extends
@Override @Override
public void close() { public void close() {
if (resultSetIterator != null) {
resultSetIterator.close();
}
try { try {
closeConnection(); closeConnection();
} finally { } finally {

View File

@ -22,11 +22,15 @@ import java.nio.file.Files;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Driver; import java.sql.Driver;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.*; import java.util.*;
import javax.sql.DataSource; import javax.sql.DataSource;
import org.apache.solr.handler.dataimport.JdbcDataSource.ResultSetIterator;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.IMocksControl; import org.easymock.IMocksControl;
import org.junit.After; import org.junit.After;
@ -201,6 +205,177 @@ public class TestJdbcDataSource extends AbstractDataImportHandlerTestCase {
mockControl.verify(); mockControl.verify();
} }
@Test
public void testClosesStatementWhenExceptionThrownOnExecuteQuery() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
SQLException sqlException = new SQLException("fake");
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andThrow(sqlException);
statement.close();
mockControl.replay();
try {
jdbcDataSource.getData("query");
fail("exception expected");
} catch (DataImportHandlerException ex) {
assertSame(sqlException, ex.getCause());
}
mockControl.verify();
}
@Test
public void testClosesStatementWhenResultSetNull() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(false);
statement.close();
mockControl.replay();
jdbcDataSource.getData("query");
mockControl.verify();
}
@Test
public void testClosesStatementWhenHasNextCalledAndResultSetNull() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
statement.close();
mockControl.replay();
Iterator<Map<String,Object>> data = jdbcDataSource.getData("query");
ResultSetIterator resultSetIterator = (ResultSetIterator) data.getClass().getDeclaredField("this$1").get(data);
resultSetIterator.setResultSet(null);
data.hasNext();
mockControl.verify();
}
@Test
public void testClosesResultSetAndStatementWhenDataSourceIsClosed() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
resultSet.close();
statement.close();
connection.commit();
connection.close();
mockControl.replay();
jdbcDataSource.getData("query");
jdbcDataSource.close();
mockControl.verify();
}
@Test
public void testClosesCurrentResultSetIteratorWhenNewOneIsCreated() throws Exception {
MockInitialContextFactory.bind("java:comp/env/jdbc/JndiDB", dataSource);
props.put(JdbcDataSource.JNDI_NAME, "java:comp/env/jdbc/JndiDB");
EasyMock.expect(dataSource.getConnection()).andReturn(connection);
jdbcDataSource.init(context, props);
connection.setAutoCommit(false);
Statement statement = mockControl.createMock(Statement.class);
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("query")).andReturn(true);
ResultSet resultSet = mockControl.createMock(ResultSet.class);
EasyMock.expect(statement.getResultSet()).andReturn(resultSet);
ResultSetMetaData metaData = mockControl.createMock(ResultSetMetaData.class);
EasyMock.expect(resultSet.getMetaData()).andReturn(metaData);
EasyMock.expect(metaData.getColumnCount()).andReturn(0);
resultSet.close();
statement.close();
EasyMock.expect(connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
.andReturn(statement);
statement.setFetchSize(500);
statement.setMaxRows(0);
EasyMock.expect(statement.execute("other query")).andReturn(false);
statement.close();
mockControl.replay();
jdbcDataSource.getData("query");
jdbcDataSource.getData("other query");
mockControl.verify();
}
@Test @Test
public void testRetrieveFromDriverManager() throws Exception { public void testRetrieveFromDriverManager() throws Exception {
DriverManager.registerDriver(driver); DriverManager.registerDriver(driver);