More work on detecting deadlocks

This commit is contained in:
James Agnew 2017-07-21 18:40:40 -04:00
parent fa050c4665
commit f5f1f5bd67
3 changed files with 372 additions and 52 deletions

View File

@ -78,13 +78,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private int mySyncSize = DEFAULT_SYNC_SIZE;
// @Autowired
// private DataSource myDataSource;
// @PostConstruct
// public void start() {
// JpaTransactionManager txManager = (JpaTransactionManager) myManagedTxManager;
// }
// @Autowired
// private DataSource myDataSource;
// @PostConstruct
// public void start() {
// JpaTransactionManager txManager = (JpaTransactionManager) myManagedTxManager;
// }
/**
* Constructor
*/
@ -106,7 +106,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@Override
@Transactional(propagation=Propagation.NEVER)
@Transactional(propagation = Propagation.NEVER)
public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
if (myNeverUseLocalSearchForUnitTests == false) {
SearchTask task = myIdToSearchTask.get(theUuid);
@ -186,9 +186,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@Override
public IBundleProvider registerSearch(final IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType) {
StopWatch w = new StopWatch();
String searchUuid = UUID.randomUUID().toString();
final String searchUuid = UUID.randomUUID().toString();
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
final ISearchBuilder sb = theCallingDao.newSearchBuilder();
@ -196,36 +196,37 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
if (theParams.isLoadSynchronous()) {
// Load the results synchronously
final List<Long> pids = new ArrayList<Long>();
Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<Long>();
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
// Execute the query and make sure we return distinct results
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
return txTemplate.execute(new TransactionCallback<SimpleBundleProvider>() {
@Override
public SimpleBundleProvider doInTransaction(TransactionStatus theStatus) {
// Load the results synchronously
final List<Long> pids = new ArrayList<Long>();
Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
break;
}
}
/*
* For synchronous queries, we load all the includes right away
* since we're returning a static bundle with all the results
* pre-loaded. This is ok because syncronous requests are not
* expected to be paged
*
* On the other hand for async queries we load includes/revincludes
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<Long>();
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
List<IBaseResource> resources = new ArrayList<IBaseResource>();
sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
return new SimpleBundleProvider(resources);
@ -441,7 +442,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
try {
saveSearch();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@ -454,7 +455,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis());
} catch (Throwable t) {
/*
* Don't print a stack trace for client errors.. that's just noisy
*/
@ -465,8 +466,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
logged = true;
ourLog.warn("Failed during search due to invalid request: {}", t.toString());
}
}
}
if (!logged) {
ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
}

View File

@ -0,0 +1,289 @@
package ca.uhn.fhir.jpa.config;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public class ConnectionWrapper implements Connection {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ConnectionWrapper.class);
private Connection myWrap;
public ConnectionWrapper(Connection theConnection) {
myWrap = theConnection;
}
@Override
public void abort(Executor theExecutor) throws SQLException {
myWrap.abort(theExecutor);
}
@Override
public void clearWarnings() throws SQLException {
myWrap.clearWarnings();
}
@Override
public void close() throws SQLException {
ourLog.info("** Closing connection");
myWrap.close();
}
@Override
public void commit() throws SQLException {
myWrap.commit();
}
@Override
public Array createArrayOf(String theTypeName, Object[] theElements) throws SQLException {
return myWrap.createArrayOf(theTypeName, theElements);
}
@Override
public Blob createBlob() throws SQLException {
return myWrap.createBlob();
}
@Override
public Clob createClob() throws SQLException {
return myWrap.createClob();
}
@Override
public NClob createNClob() throws SQLException {
return myWrap.createNClob();
}
@Override
public SQLXML createSQLXML() throws SQLException {
return myWrap.createSQLXML();
}
@Override
public Statement createStatement() throws SQLException {
return myWrap.createStatement();
}
@Override
public Statement createStatement(int theResultSetType, int theResultSetConcurrency) throws SQLException {
return myWrap.createStatement(theResultSetType, theResultSetConcurrency);
}
@Override
public Statement createStatement(int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException {
return myWrap.createStatement(theResultSetType, theResultSetConcurrency, theResultSetHoldability);
}
@Override
public Struct createStruct(String theTypeName, Object[] theAttributes) throws SQLException {
return myWrap.createStruct(theTypeName, theAttributes);
}
@Override
public boolean getAutoCommit() throws SQLException {
return myWrap.getAutoCommit();
}
@Override
public String getCatalog() throws SQLException {
return myWrap.getCatalog();
}
@Override
public Properties getClientInfo() throws SQLException {
return myWrap.getClientInfo();
}
@Override
public String getClientInfo(String theName) throws SQLException {
return getClientInfo(theName);
}
@Override
public int getHoldability() throws SQLException {
return myWrap.getHoldability();
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
return myWrap.getMetaData();
}
@Override
public int getNetworkTimeout() throws SQLException {
return myWrap.getNetworkTimeout();
}
@Override
public String getSchema() throws SQLException {
return myWrap.getSchema();
}
@Override
public int getTransactionIsolation() throws SQLException {
return myWrap.getTransactionIsolation();
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return myWrap.getTypeMap();
}
@Override
public SQLWarning getWarnings() throws SQLException {
return myWrap.getWarnings();
}
@Override
public boolean isClosed() throws SQLException {
return myWrap.isClosed();
}
@Override
public boolean isReadOnly() throws SQLException {
return myWrap.isReadOnly();
}
@Override
public boolean isValid(int theTimeout) throws SQLException {
return myWrap.isValid(theTimeout);
}
@Override
public boolean isWrapperFor(Class<?> theIface) throws SQLException {
return myWrap.isWrapperFor(theIface);
}
@Override
public String nativeSQL(String theSql) throws SQLException {
return myWrap.nativeSQL(theSql);
}
@Override
public CallableStatement prepareCall(String theSql) throws SQLException {
return myWrap.prepareCall(theSql);
}
@Override
public CallableStatement prepareCall(String theSql, int theResultSetType, int theResultSetConcurrency) throws SQLException {
return myWrap.prepareCall(theSql, theResultSetType, theResultSetConcurrency);
}
@Override
public CallableStatement prepareCall(String theSql, int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException {
return myWrap.prepareCall(theSql, theResultSetType, theResultSetConcurrency, theResultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String theSql) throws SQLException {
return myWrap.prepareStatement(theSql);
}
@Override
public PreparedStatement prepareStatement(String theSql, int theAutoGeneratedKeys) throws SQLException {
return myWrap.prepareStatement(theSql, theAutoGeneratedKeys);
}
@Override
public PreparedStatement prepareStatement(String theSql, int theResultSetType, int theResultSetConcurrency) throws SQLException {
return myWrap.prepareStatement(theSql, theResultSetType, theResultSetConcurrency);
}
@Override
public PreparedStatement prepareStatement(String theSql, int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException {
return myWrap.prepareStatement(theSql, theResultSetType, theResultSetConcurrency, theResultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String theSql, int[] theColumnIndexes) throws SQLException {
return myWrap.prepareStatement(theSql, theColumnIndexes);
}
@Override
public PreparedStatement prepareStatement(String theSql, String[] theColumnNames) throws SQLException {
return myWrap.prepareStatement(theSql, theColumnNames);
}
@Override
public void releaseSavepoint(Savepoint theSavepoint) throws SQLException {
myWrap.releaseSavepoint(theSavepoint);
}
@Override
public void rollback() throws SQLException {
myWrap.rollback();
}
@Override
public void rollback(Savepoint theSavepoint) throws SQLException {
myWrap.rollback(theSavepoint);
}
@Override
public void setAutoCommit(boolean theAutoCommit) throws SQLException {
myWrap.setAutoCommit(theAutoCommit);
}
@Override
public void setCatalog(String theCatalog) throws SQLException {
myWrap.setCatalog(theCatalog);
}
@Override
public void setClientInfo(Properties theProperties) throws SQLClientInfoException {
myWrap.setClientInfo(theProperties);
}
@Override
public void setClientInfo(String theName, String theValue) throws SQLClientInfoException {
myWrap.setClientInfo(theName, theValue);
}
@Override
public void setHoldability(int theHoldability) throws SQLException {
myWrap.setHoldability(theHoldability);
}
@Override
public void setNetworkTimeout(Executor theExecutor, int theMilliseconds) throws SQLException {
myWrap.setNetworkTimeout(theExecutor, theMilliseconds);
}
@Override
public void setReadOnly(boolean theReadOnly) throws SQLException {
myWrap.setReadOnly(theReadOnly);
}
@Override
public Savepoint setSavepoint() throws SQLException {
return myWrap.setSavepoint();
}
@Override
public Savepoint setSavepoint(String theName) throws SQLException {
return myWrap.setSavepoint(theName);
}
@Override
public void setSchema(String theSchema) throws SQLException {
myWrap.setSchema(theSchema);
}
@Override
public void setTransactionIsolation(int theLevel) throws SQLException {
myWrap.setTransactionIsolation(theLevel);
}
@Override
public void setTypeMap(Map<String, Class<?>> theMap) throws SQLException {
myWrap.setTypeMap(theMap);
}
@Override
public <T> T unwrap(Class<T> theIface) throws SQLException {
return myWrap.unwrap(theIface);
}
}

View File

@ -1,5 +1,7 @@
package ca.uhn.fhir.jpa.config;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@ -8,9 +10,7 @@ import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.*;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ -18,21 +18,51 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
@Configuration
@EnableTransactionManagement()
public class TestDstu3Config extends BaseJavaConfigDstu3 {
static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestDstu3Config.class);
@Bean()
public DaoConfig daoConfig() {
return new DaoConfig();
}
private boolean myLogConnection = false;
@Bean()
public DataSource dataSource() {
BasicDataSource retVal = new BasicDataSource();
BasicDataSource retVal = new BasicDataSource() {
@Override
public Connection getConnection() throws SQLException {
if (myLogConnection) {
logGetConnectionStackTrace();
return new ConnectionWrapper(super.getConnection());
} else {
return super.getConnection();
}
}
private void logGetConnectionStackTrace() {
try {
throw new Exception();
} catch (Exception e) {
StringBuilder b = new StringBuilder();
b.append("New connection request:");
for (StackTraceElement next : e.getStackTrace()) {
if (next.getClassName().contains("fhir")) {
b.append("\n ").append(next.getClassName()).append(" ").append(next.getFileName()).append(":").append(next.getLineNumber());
}
}
ourLog.info(b.toString());
}
}
};
retVal.setDriver(new org.apache.derby.jdbc.EmbeddedDriver());
retVal.setUrl("jdbc:derby:memory:myUnitTestDB;create=true");
retVal.setUsername("");
@ -43,12 +73,12 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
* and catch any potential deadlocks caused by database connection
* starvation
*/
int maxThreads = (int)(Math.random() * 6) + 1;
int maxThreads = (int) (Math.random() * 6) + 1;
retVal.setMaxTotal(maxThreads);
DataSource dataSource = ProxyDataSourceBuilder
.create(retVal)
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
.countQuery()
.build();
@ -56,13 +86,6 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
return dataSource;
}
@Bean()
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
JpaTransactionManager retVal = new JpaTransactionManager();
retVal.setEntityManagerFactory(entityManagerFactory);
return retVal;
}
@Bean()
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean retVal = new LocalContainerEntityManagerFactoryBean();
@ -102,4 +125,11 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
return requestValidator;
}
@Bean()
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
JpaTransactionManager retVal = new JpaTransactionManager();
retVal.setEntityManagerFactory(entityManagerFactory);
return retVal;
}
}