diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index 8e1e447de63..3a716126bcf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -78,13 +78,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { private int mySyncSize = DEFAULT_SYNC_SIZE; -// @Autowired -// private DataSource myDataSource; -// @PostConstruct -// public void start() { -// JpaTransactionManager txManager = (JpaTransactionManager) myManagedTxManager; -// } - + // @Autowired + // private DataSource myDataSource; + // @PostConstruct + // public void start() { + // JpaTransactionManager txManager = (JpaTransactionManager) myManagedTxManager; + // } + /** * Constructor */ @@ -106,7 +106,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } @Override - @Transactional(propagation=Propagation.NEVER) + @Transactional(propagation = Propagation.NEVER) public List getResources(final String theUuid, int theFrom, int theTo) { if (myNeverUseLocalSearchForUnitTests == false) { SearchTask task = myIdToSearchTask.get(theUuid); @@ -186,9 +186,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } @Override - public IBundleProvider registerSearch(final IDao theCallingDao, SearchParameterMap theParams, String theResourceType) { + public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType) { StopWatch w = new StopWatch(); - String searchUuid = UUID.randomUUID().toString(); + final String searchUuid = UUID.randomUUID().toString(); Class resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass(); final ISearchBuilder sb = theCallingDao.newSearchBuilder(); @@ -196,36 +196,37 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { if (theParams.isLoadSynchronous()) { - // Load the results synchronously - final List pids = new ArrayList(); - - Iterator resultIter = sb.createQuery(theParams, searchUuid); - while (resultIter.hasNext()) { - pids.add(resultIter.next()); - if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) { - break; - } - } - - /* - * For synchronous queries, we load all the includes right away - * since we're returning a static bundle with all the results - * pre-loaded. This is ok because syncronous requests are not - * expected to be paged - * - * On the other hand for async queries we load includes/revincludes - * individually for pages as we return them to clients - */ - final Set includedPids = new HashSet(); - includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated())); - includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated())); - // Execute the query and make sure we return distinct results TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED); return txTemplate.execute(new TransactionCallback() { @Override public SimpleBundleProvider doInTransaction(TransactionStatus theStatus) { + + // Load the results synchronously + final List pids = new ArrayList(); + + Iterator resultIter = sb.createQuery(theParams, searchUuid); + while (resultIter.hasNext()) { + pids.add(resultIter.next()); + if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) { + break; + } + } + + /* + * For synchronous queries, we load all the includes right away + * since we're returning a static bundle with all the results + * pre-loaded. This is ok because syncronous requests are not + * expected to be paged + * + * On the other hand for async queries we load includes/revincludes + * individually for pages as we return them to clients + */ + final Set includedPids = new HashSet(); + includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated())); + includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated())); + List resources = new ArrayList(); sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao); return new SimpleBundleProvider(resources); @@ -441,7 +442,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { try { saveSearch(); - + TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); txTemplate.execute(new TransactionCallbackWithoutResult() { @@ -454,7 +455,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis()); } catch (Throwable t) { - + /* * Don't print a stack trace for client errors.. that's just noisy */ @@ -465,8 +466,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { logged = true; ourLog.warn("Failed during search due to invalid request: {}", t.toString()); } - } - + } + if (!logged) { ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/ConnectionWrapper.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/ConnectionWrapper.java new file mode 100644 index 00000000000..51872f77f2b --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/ConnectionWrapper.java @@ -0,0 +1,289 @@ +package ca.uhn.fhir.jpa.config; + +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public class ConnectionWrapper implements Connection { + + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ConnectionWrapper.class); + + private Connection myWrap; + + public ConnectionWrapper(Connection theConnection) { + myWrap = theConnection; + } + + @Override + public void abort(Executor theExecutor) throws SQLException { + myWrap.abort(theExecutor); + } + + @Override + public void clearWarnings() throws SQLException { + myWrap.clearWarnings(); + } + + @Override + public void close() throws SQLException { + ourLog.info("** Closing connection"); + myWrap.close(); + } + + @Override + public void commit() throws SQLException { + myWrap.commit(); + } + + @Override + public Array createArrayOf(String theTypeName, Object[] theElements) throws SQLException { + return myWrap.createArrayOf(theTypeName, theElements); + } + + @Override + public Blob createBlob() throws SQLException { + return myWrap.createBlob(); + } + + @Override + public Clob createClob() throws SQLException { + return myWrap.createClob(); + } + + @Override + public NClob createNClob() throws SQLException { + return myWrap.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return myWrap.createSQLXML(); + } + + @Override + public Statement createStatement() throws SQLException { + return myWrap.createStatement(); + } + + @Override + public Statement createStatement(int theResultSetType, int theResultSetConcurrency) throws SQLException { + return myWrap.createStatement(theResultSetType, theResultSetConcurrency); + } + + @Override + public Statement createStatement(int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException { + return myWrap.createStatement(theResultSetType, theResultSetConcurrency, theResultSetHoldability); + } + + @Override + public Struct createStruct(String theTypeName, Object[] theAttributes) throws SQLException { + return myWrap.createStruct(theTypeName, theAttributes); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return myWrap.getAutoCommit(); + } + + @Override + public String getCatalog() throws SQLException { + return myWrap.getCatalog(); + } + + @Override + public Properties getClientInfo() throws SQLException { + return myWrap.getClientInfo(); + } + + @Override + public String getClientInfo(String theName) throws SQLException { + return getClientInfo(theName); + } + + @Override + public int getHoldability() throws SQLException { + return myWrap.getHoldability(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return myWrap.getMetaData(); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return myWrap.getNetworkTimeout(); + } + + @Override + public String getSchema() throws SQLException { + return myWrap.getSchema(); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return myWrap.getTransactionIsolation(); + } + + @Override + public Map> getTypeMap() throws SQLException { + return myWrap.getTypeMap(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return myWrap.getWarnings(); + } + + @Override + public boolean isClosed() throws SQLException { + return myWrap.isClosed(); + } + + @Override + public boolean isReadOnly() throws SQLException { + return myWrap.isReadOnly(); + } + + @Override + public boolean isValid(int theTimeout) throws SQLException { + return myWrap.isValid(theTimeout); + } + + @Override + public boolean isWrapperFor(Class theIface) throws SQLException { + return myWrap.isWrapperFor(theIface); + } + + @Override + public String nativeSQL(String theSql) throws SQLException { + return myWrap.nativeSQL(theSql); + } + + @Override + public CallableStatement prepareCall(String theSql) throws SQLException { + return myWrap.prepareCall(theSql); + } + + @Override + public CallableStatement prepareCall(String theSql, int theResultSetType, int theResultSetConcurrency) throws SQLException { + return myWrap.prepareCall(theSql, theResultSetType, theResultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String theSql, int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException { + return myWrap.prepareCall(theSql, theResultSetType, theResultSetConcurrency, theResultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String theSql) throws SQLException { + return myWrap.prepareStatement(theSql); + } + + @Override + public PreparedStatement prepareStatement(String theSql, int theAutoGeneratedKeys) throws SQLException { + return myWrap.prepareStatement(theSql, theAutoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String theSql, int theResultSetType, int theResultSetConcurrency) throws SQLException { + return myWrap.prepareStatement(theSql, theResultSetType, theResultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String theSql, int theResultSetType, int theResultSetConcurrency, int theResultSetHoldability) throws SQLException { + return myWrap.prepareStatement(theSql, theResultSetType, theResultSetConcurrency, theResultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String theSql, int[] theColumnIndexes) throws SQLException { + return myWrap.prepareStatement(theSql, theColumnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String theSql, String[] theColumnNames) throws SQLException { + return myWrap.prepareStatement(theSql, theColumnNames); + } + + @Override + public void releaseSavepoint(Savepoint theSavepoint) throws SQLException { + myWrap.releaseSavepoint(theSavepoint); + } + + @Override + public void rollback() throws SQLException { + myWrap.rollback(); + } + + @Override + public void rollback(Savepoint theSavepoint) throws SQLException { + myWrap.rollback(theSavepoint); + } + + @Override + public void setAutoCommit(boolean theAutoCommit) throws SQLException { + myWrap.setAutoCommit(theAutoCommit); + } + + @Override + public void setCatalog(String theCatalog) throws SQLException { + myWrap.setCatalog(theCatalog); + } + + @Override + public void setClientInfo(Properties theProperties) throws SQLClientInfoException { + myWrap.setClientInfo(theProperties); + } + + @Override + public void setClientInfo(String theName, String theValue) throws SQLClientInfoException { + myWrap.setClientInfo(theName, theValue); + } + + @Override + public void setHoldability(int theHoldability) throws SQLException { + myWrap.setHoldability(theHoldability); + } + + @Override + public void setNetworkTimeout(Executor theExecutor, int theMilliseconds) throws SQLException { + myWrap.setNetworkTimeout(theExecutor, theMilliseconds); + } + + @Override + public void setReadOnly(boolean theReadOnly) throws SQLException { + myWrap.setReadOnly(theReadOnly); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return myWrap.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String theName) throws SQLException { + return myWrap.setSavepoint(theName); + } + + @Override + public void setSchema(String theSchema) throws SQLException { + myWrap.setSchema(theSchema); + } + + @Override + public void setTransactionIsolation(int theLevel) throws SQLException { + myWrap.setTransactionIsolation(theLevel); + } + + @Override + public void setTypeMap(Map> theMap) throws SQLException { + myWrap.setTypeMap(theMap); + } + + @Override + public T unwrap(Class theIface) throws SQLException { + return myWrap.unwrap(theIface); + } + +} \ No newline at end of file diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java index 3a2e97ed30b..098a1b6d110 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/config/TestDstu3Config.java @@ -1,5 +1,7 @@ package ca.uhn.fhir.jpa.config; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -8,9 +10,7 @@ import javax.sql.DataSource; import org.apache.commons.dbcp2.BasicDataSource; import org.hibernate.jpa.HibernatePersistenceProvider; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; +import org.springframework.context.annotation.*; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -18,21 +18,51 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor; import ca.uhn.fhir.validation.ResultSeverityEnum; -import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel; import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder; @Configuration @EnableTransactionManagement() public class TestDstu3Config extends BaseJavaConfigDstu3 { + static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestDstu3Config.class); + @Bean() public DaoConfig daoConfig() { return new DaoConfig(); } + private boolean myLogConnection = false; + @Bean() public DataSource dataSource() { - BasicDataSource retVal = new BasicDataSource(); + BasicDataSource retVal = new BasicDataSource() { + + @Override + public Connection getConnection() throws SQLException { + if (myLogConnection) { + logGetConnectionStackTrace(); + return new ConnectionWrapper(super.getConnection()); + } else { + return super.getConnection(); + } + } + + private void logGetConnectionStackTrace() { + try { + throw new Exception(); + } catch (Exception e) { + StringBuilder b = new StringBuilder(); + b.append("New connection request:"); + for (StackTraceElement next : e.getStackTrace()) { + if (next.getClassName().contains("fhir")) { + b.append("\n ").append(next.getClassName()).append(" ").append(next.getFileName()).append(":").append(next.getLineNumber()); + } + } + ourLog.info(b.toString()); + } + } + + }; retVal.setDriver(new org.apache.derby.jdbc.EmbeddedDriver()); retVal.setUrl("jdbc:derby:memory:myUnitTestDB;create=true"); retVal.setUsername(""); @@ -43,12 +73,12 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 { * and catch any potential deadlocks caused by database connection * starvation */ - int maxThreads = (int)(Math.random() * 6) + 1; + int maxThreads = (int) (Math.random() * 6) + 1; retVal.setMaxTotal(maxThreads); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) -// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL") + // .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL") .logSlowQueryBySlf4j(10, TimeUnit.SECONDS) .countQuery() .build(); @@ -56,13 +86,6 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 { return dataSource; } - @Bean() - public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { - JpaTransactionManager retVal = new JpaTransactionManager(); - retVal.setEntityManagerFactory(entityManagerFactory); - return retVal; - } - @Bean() public LocalContainerEntityManagerFactoryBean entityManagerFactory() { LocalContainerEntityManagerFactoryBean retVal = new LocalContainerEntityManagerFactoryBean(); @@ -102,4 +125,11 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 { return requestValidator; } + @Bean() + public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { + JpaTransactionManager retVal = new JpaTransactionManager(); + retVal.setEntityManagerFactory(entityManagerFactory); + return retVal; + } + }