diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/api/SearchTotalModeEnum.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/api/SearchTotalModeEnum.java index d10613d01a3..1a0d781e921 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/api/SearchTotalModeEnum.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/api/SearchTotalModeEnum.java @@ -1,5 +1,25 @@ package ca.uhn.fhir.rest.api; +/*- + * #%L + * HAPI FHIR - Core Library + * %% + * Copyright (C) 2014 - 2018 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import java.util.HashMap; import java.util.Map; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java index f9c38feeed7..111656582d0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/SearchParameterMap.java @@ -28,9 +28,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index b6eb3a89253..44ac751ae71 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -168,7 +168,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { verifySearchHasntFailedOrThrowInternalErrorException(search); if (search.getStatus() == SearchStatusEnum.FINISHED) { - ourLog.info("Search entity marked as finished"); + ourLog.info("Search entity marked as finished with {} results", search.getNumFound()); break; } if (search.getNumFound() >= theTo) { @@ -189,7 +189,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { search = newSearch.get(); String resourceType = search.getResourceType(); SearchParameterMap params = search.getSearchParameterMap(); - SearchContinuationTask task = new SearchContinuationTask(search, myDaoRegistry.getResourceDao(resourceType), params, resourceType); + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(resourceType); + SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType); myIdToSearchTask.put(search.getUuid(), task); myExecutor.submit(task); } @@ -228,10 +229,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); txTemplate.afterPropertiesSet(); return txTemplate.execute(t -> { - Optional searchOpt = mySearchDao.findById(theSearch.getId()); - Search search = searchOpt.orElseThrow(IllegalStateException::new); - if (search.getStatus() != SearchStatusEnum.PASSCMPLET) { - throw new IllegalStateException("Can't change to LOADING because state is " + search.getStatus()); + myEntityManager.refresh(theSearch); + if (theSearch.getStatus() != SearchStatusEnum.PASSCMPLET) { + throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus()); } theSearch.setStatus(SearchStatusEnum.LOADING); Search newSearch = mySearchDao.save(theSearch); @@ -239,6 +239,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { }); } catch (Exception e) { ourLog.warn("Failed to activate search: {}", e.toString()); + ourLog.trace("Failed to activate search", e); return Optional.empty(); } } @@ -438,6 +439,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { myManagedTxManager = theTxManager; } + @VisibleForTesting + public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { + myDaoRegistry = theDaoRegistry; + } + public abstract class BaseTask implements Callable { private final SearchParameterMap myParams; private final IDao myCallingDao; @@ -486,14 +492,41 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } public List getResourcePids(int theFromIndex, int theToIndex) { - ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); + ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); boolean keepWaiting; do { synchronized (mySyncedPids) { ourLog.trace("Search status is {}", mySearch.getStatus()); - keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING; + boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; + if (!haveEnoughResults) { + switch (mySearch.getStatus()) { + case LOADING: + keepWaiting = true; + break; + case PASSCMPLET: + /* + * If we get here, it means that the user requested resources that crossed the + * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the + * user has requested resources 0-60, then they would get 0-50 back but the search + * coordinator would then stop searching.SearchCoordinatorSvcImplTest + */ + List remainingResources = SearchCoordinatorSvcImpl.this.getResources(mySearch.getUuid(), mySyncedPids.size(), theToIndex); + ourLog.debug("Adding {} resources to the existing {} synced resource IDs", remainingResources.size(), mySyncedPids.size()); + mySyncedPids.addAll(remainingResources); + keepWaiting = false; + break; + case FAILED: + case FINISHED: + default: + keepWaiting = false; + break; + } + } else { + keepWaiting = false; + } } + if (keepWaiting) { ourLog.info("Waiting, as we only have {} results", mySyncedPids.size()); try { @@ -808,6 +841,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { * Construct the SQL query we'll be sending to the database */ IResultIterator theResultIterator = sb.createQuery(myParams, mySearch.getUuid()); + assert (theResultIterator != null); /* * The following loop actually loads the PIDs of the resources @@ -869,6 +903,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { txTemplate.afterPropertiesSet(); txTemplate.execute(t -> { List previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(getSearch()); + ourLog.debug("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid()); setPreviouslyAddedResourcePids(previouslyAddedResourcePids); return null; }); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java index 8c678a06312..f3faff4ccf0 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4Test.java @@ -17,13 +17,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.stringContainsInOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.BufferedReader; import java.io.IOException; @@ -41,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -159,6 +154,50 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test { myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds()); } + + @Test + public void testSearchLinksWorkWithIncludes() { + for (int i = 0; i < 5; i++) { + + Organization o = new Organization(); + o.setId("O" + i); + o.setName("O" + i); + IIdType oid = ourClient.update().resource(o).execute().getId().toUnqualifiedVersionless(); + + Patient p = new Patient(); + p.setId("P" + i); + p.getManagingOrganization().setReference(oid.getValue()); + ourClient.update().resource(p).execute(); + + } + + Bundle output = ourClient + .search() + .forResource("Patient") + .include(IBaseResource.INCLUDE_ALL) + .count(3) + .returnBundle(Bundle.class) + .execute(); + + List ids = output.getEntry().stream().map(t -> t.getResource().getIdElement().toUnqualifiedVersionless().getValue()).collect(Collectors.toList()); + ourLog.info("Ids: {}", ids); + assertEquals(6, output.getEntry().size()); + assertNotNull(output.getLink("next")); + + // Page 2 + output = ourClient + .loadPage() + .next(output) + .execute(); + + ids = output.getEntry().stream().map(t -> t.getResource().getIdElement().toUnqualifiedVersionless().getValue()).collect(Collectors.toList()); + ourLog.info("Ids: {}", ids); + assertEquals(4, output.getEntry().size()); + assertNull(output.getLink("next")); + + } + + @Test public void testDeleteConditional() { @@ -1658,27 +1697,25 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test { .returnResourceType(Bundle.class) .execute(); - TreeSet ids = new TreeSet<>(); + ArrayList ids = new ArrayList<>(); for (int i = 0; i < responseBundle.getEntry().size(); i++) { - for (BundleEntryComponent nextEntry : responseBundle.getEntry()) { - ids.add(nextEntry.getResource().getIdElement().getIdPart()); - } + BundleEntryComponent nextEntry = responseBundle.getEntry().get(i); + ids.add(nextEntry.getResource().getIdElement().getIdPart()); } BundleLinkComponent nextLink = responseBundle.getLink("next"); - ourLog.info("Have {} IDs with next link: ", ids.size(), nextLink); + ourLog.info("Have {} IDs with next link[{}] : {}", ids.size(), nextLink, ids); while (nextLink != null) { String nextUrl = nextLink.getUrl(); responseBundle = ourClient.fetchResourceFromUrl(Bundle.class, nextUrl); for (int i = 0; i < responseBundle.getEntry().size(); i++) { - for (BundleEntryComponent nextEntry : responseBundle.getEntry()) { - ids.add(nextEntry.getResource().getIdElement().getIdPart()); - } + BundleEntryComponent nextEntry = responseBundle.getEntry().get(i); + ids.add(nextEntry.getResource().getIdElement().getIdPart()); } nextLink = responseBundle.getLink("next"); - ourLog.info("Have {} IDs with next link: ", ids.size(), nextLink); + ourLog.info("Have {} IDs with next link[{}] : {}", ids.size(), nextLink, ids); } assertThat(ids, hasItem(id.getIdPart())); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java index fd793f2dcf7..bdfa14b4ee2 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImplTest.java @@ -30,8 +30,11 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; +import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.transaction.PlatformTransactionManager; @@ -53,7 +56,7 @@ public class SearchCoordinatorSvcImplTest { @Captor ArgumentCaptor> mySearchResultIterCaptor; @Mock - private IDao myCallingDao; + private IFhirResourceDao myCallingDao; @Mock private EntityManager myEntityManager; private int myExpectedNumberOfSearchBuildersCreated = 2; @@ -70,6 +73,9 @@ public class SearchCoordinatorSvcImplTest { @Mock private PlatformTransactionManager myTxManager; private DaoConfig myDaoConfig; + private Search myCurrentSearch; + @Mock + private DaoRegistry myDaoRegistry; @After public void after() { @@ -78,6 +84,7 @@ public class SearchCoordinatorSvcImplTest { @Before public void before() { + myCurrentSearch = null; mySvc = new SearchCoordinatorSvcImpl(); mySvc.setEntityManagerForUnitTest(myEntityManager); @@ -86,6 +93,7 @@ public class SearchCoordinatorSvcImplTest { mySvc.setSearchDaoForUnitTest(mySearchDao); mySvc.setSearchDaoIncludeForUnitTest(mySearchIncludeDao); mySvc.setSearchDaoResultForUnitTest(mySearchResultDao); + mySvc.setDaoRegistryForUnitTest(myDaoRegistry); myDaoConfig = new DaoConfig(); mySvc.setDaoConfigForUnitTest(myDaoConfig); @@ -151,24 +159,43 @@ public class SearchCoordinatorSvcImplTest { } } - +private static final Logger ourLog = LoggerFactory.getLogger(SearchCoordinatorSvcImplTest.class); @Test public void testAsyncSearchLargeResultSetBigCountSameCoordinator() { SearchParameterMap params = new SearchParameterMap(); params.add("name", new StringParam("ANAME")); List pids = createPidSequence(10, 800); - IResultIterator iter = new SlowIterator(pids.iterator(), 1); - when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter); - + SlowIterator iter = new SlowIterator(pids.iterator(), 1); + when(mySearchBuider.createQuery(any(), any(String.class))).thenReturn(iter); doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao)); + when(mySearchResultDao.findWithSearchUuid(any(), any())).thenAnswer(t -> { + List returnedValues = iter.getReturnedValues(); + Pageable page = (Pageable) t.getArguments()[1]; + int offset = (int) page.getOffset(); + int end = (int)(page.getOffset() + page.getPageSize()); + end = Math.min(end, returnedValues.size()); + offset = Math.min(offset, returnedValues.size()); + ourLog.info("findWithSearchUuid {} - {} out of {} values", offset, end, returnedValues.size()); + return new PageImpl<>(returnedValues.subList(offset, end)); + }); + IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective()); assertNotNull(result.getUuid()); assertEquals(null, result.size()); List resources; + when(mySearchDao.save(any())).thenAnswer(t -> { + Search search = (Search) t.getArguments()[0]; + myCurrentSearch = search; + return search; + }); + when(mySearchDao.findByUuid(any())).thenAnswer(t -> myCurrentSearch); + IFhirResourceDao dao = myCallingDao; + when(myDaoRegistry.getResourceDao(any())).thenReturn(dao); + resources = result.getResources(0, 100000); assertEquals(790, resources.size()); assertEquals("10", resources.get(0).getIdElement().getValueAsString()); @@ -178,7 +205,7 @@ public class SearchCoordinatorSvcImplTest { verify(mySearchDao, atLeastOnce()).save(searchCaptor.capture()); verify(mySearchResultDao, atLeastOnce()).saveAll(mySearchResultIterCaptor.capture()); - List allResults = new ArrayList(); + List allResults = new ArrayList<>(); for (Iterable next : mySearchResultIterCaptor.getAllValues()) { allResults.addAll(Lists.newArrayList(next)); } @@ -186,6 +213,8 @@ public class SearchCoordinatorSvcImplTest { assertEquals(790, allResults.size()); assertEquals(10, allResults.get(0).getResourcePid().longValue()); assertEquals(799, allResults.get(789).getResourcePid().longValue()); + + myExpectedNumberOfSearchBuildersCreated = 3; } @Test @@ -224,7 +253,7 @@ public class SearchCoordinatorSvcImplTest { List pids = createPidSequence(10, 800); IResultIterator iter = new SlowIterator(pids.iterator(), 2); when(mySearchBuider.createQuery(Mockito.same(params), any(String.class))).thenReturn(iter); - + when(mySearchDao.save(any())).thenAnswer(t -> t.getArguments()[0]); doAnswer(loadPids()).when(mySearchBuider).loadResourcesByPid(any(List.class), any(List.class), any(Set.class), anyBoolean(), any(EntityManager.class), any(FhirContext.class), same(myCallingDao)); IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective()); @@ -257,12 +286,6 @@ public class SearchCoordinatorSvcImplTest { assertEquals("20", resources.get(0).getIdElement().getValueAsString()); assertEquals("29", resources.get(9).getIdElement().getValueAsString()); - provider = new PersistedJpaBundleProvider(result.getUuid(), myCallingDao); - resources = provider.getResources(20, 99999); - assertEquals(770, resources.size()); - assertEquals("30", resources.get(0).getIdElement().getValueAsString()); - assertEquals("799", resources.get(769).getIdElement().getValueAsString()); - myExpectedNumberOfSearchBuildersCreated = 4; } @@ -452,11 +475,19 @@ public class SearchCoordinatorSvcImplTest { } } + /** + * THIS CLASS IS FOR UNIT TESTS ONLY - It is delioberately inefficient + * and keeps things in memory. + *

+ * Don't use it in real code! + */ public static class SlowIterator extends BaseIterator implements IResultIterator { + private static final Logger ourLog = LoggerFactory.getLogger(SlowIterator.class); private final IResultIterator myResultIteratorWrap; private int myDelay; private Iterator myWrap; + private List myReturnedValues = new ArrayList<>(); public SlowIterator(Iterator theWrap, int theDelay) { myWrap = theWrap; @@ -470,9 +501,17 @@ public class SearchCoordinatorSvcImplTest { myDelay = theDelay; } + public List getReturnedValues() { + return myReturnedValues; + } + @Override public boolean hasNext() { - return myWrap.hasNext(); + boolean retVal = myWrap.hasNext(); + if (!retVal) { + ourLog.info("No more results remaining"); + } + return retVal; } @Override @@ -482,7 +521,9 @@ public class SearchCoordinatorSvcImplTest { } catch (InterruptedException e) { // ignore } - return myWrap.next(); + Long retVal = myWrap.next(); + myReturnedValues.add(retVal); + return retVal; } @Override diff --git a/hapi-fhir-jpaserver-migrate/pom.xml b/hapi-fhir-jpaserver-migrate/pom.xml index a804fc80a9d..2618d55c8c7 100644 --- a/hapi-fhir-jpaserver-migrate/pom.xml +++ b/hapi-fhir-jpaserver-migrate/pom.xml @@ -31,6 +31,10 @@ org.springframework spring-jdbc + + org.apache.commons + commons-dbcp2 + @@ -45,11 +49,6 @@ derby test - - org.apache.commons - commons-dbcp2 - test - junit junit diff --git a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/DriverTypeEnum.java b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/DriverTypeEnum.java index 8e5cba77e1d..c3d8a8725ba 100644 --- a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/DriverTypeEnum.java +++ b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/DriverTypeEnum.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.migrate; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,20 +77,13 @@ public enum DriverTypeEnum { throw new InternalErrorException("Unable to find driver class: " + myDriverClassName, e); } - SingleConnectionDataSource dataSource = new SingleConnectionDataSource(){ - @Override - protected Connection getConnectionFromDriver(Properties props) throws SQLException { - Connection connect = driver.connect(theUrl, props); - assert connect != null; - return connect; - } - }; - dataSource.setAutoCommit(false); + BasicDataSource dataSource = new BasicDataSource(); +// dataSource.setAutoCommit(false); dataSource.setDriverClassName(myDriverClassName); dataSource.setUrl(theUrl); dataSource.setUsername(theUsername); dataSource.setPassword(thePassword); - dataSource.setSuppressClose(true); +// dataSource.setSuppressClose(true); DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(); transactionManager.setDataSource(dataSource); diff --git a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTask.java b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTask.java index 620792a4754..0300e92fa32 100644 --- a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTask.java +++ b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTask.java @@ -23,15 +23,21 @@ package ca.uhn.fhir.jpa.migrate.taskdef; import ca.uhn.fhir.util.StopWatch; import com.google.common.collect.ForwardingMap; import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ColumnMapRowMapper; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowCallbackHandler; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.*; import java.util.function.Function; public class CalculateHashesTask extends BaseTableColumnTask { @@ -39,75 +45,147 @@ public class CalculateHashesTask extends BaseTableColumnTask, Long>> myCalculators = new HashMap<>(); + private ThreadPoolExecutor myExecutor; public void setBatchSize(int theBatchSize) { myBatchSize = theBatchSize; } + /** + * Constructor + */ + public CalculateHashesTask() { + super(); + } @Override - public void execute() { + public synchronized void execute() throws SQLException { if (isDryRun()) { return; } - List> rows; - do { - rows = getTxTemplate().execute(t -> { - JdbcTemplate jdbcTemplate = newJdbcTemnplate(); - jdbcTemplate.setMaxRows(myBatchSize); - String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL"; - ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName()); - return jdbcTemplate.queryForList(sql); - }); + initializeExecutor(); + try { - updateRows(rows); - } while (rows.size() > 0); - } + while(true) { + MyRowCallbackHandler rch = new MyRowCallbackHandler(); + getTxTemplate().execute(t -> { + JdbcTemplate jdbcTemplate = newJdbcTemnplate(); + jdbcTemplate.setMaxRows(100000); + String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL"; + ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName()); - private void updateRows(List> theRows) { - StopWatch sw = new StopWatch(); - getTxTemplate().execute(t -> { + jdbcTemplate.query(sql, rch); + rch.done(); - // Loop through rows - assert theRows != null; - for (Map nextRow : theRows) { + return null; + }); - Map newValues = new HashMap<>(); - MandatoryKeyMap nextRowMandatoryKeyMap = new MandatoryKeyMap<>(nextRow); - - // Apply calculators - for (Map.Entry, Long>> nextCalculatorEntry : myCalculators.entrySet()) { - String nextColumn = nextCalculatorEntry.getKey(); - Function, Long> nextCalculator = nextCalculatorEntry.getValue(); - Long value = nextCalculator.apply(nextRowMandatoryKeyMap); - newValues.put(nextColumn, value); + rch.submitNext(); + List> futures = rch.getFutures(); + if (futures.isEmpty()) { + break; } - // Generate update SQL - StringBuilder sqlBuilder = new StringBuilder(); - List arguments = new ArrayList<>(); - sqlBuilder.append("UPDATE "); - sqlBuilder.append(getTableName()); - sqlBuilder.append(" SET "); - for (Map.Entry nextNewValueEntry : newValues.entrySet()) { - if (arguments.size() > 0) { - sqlBuilder.append(", "); + ourLog.info("Waiting for {} tasks to complete", futures.size()); + for (Future next : futures) { + try { + next.get(); + } catch (Exception e) { + throw new SQLException(e); } - sqlBuilder.append(nextNewValueEntry.getKey()).append(" = ?"); - arguments.add(nextNewValueEntry.getValue()); } - sqlBuilder.append(" WHERE SP_ID = ?"); - arguments.add((Long) nextRow.get("SP_ID")); - - // Apply update SQL - newJdbcTemnplate().update(sqlBuilder.toString(), arguments.toArray()); } - return theRows.size(); - }); - ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString()); + } finally { + destroyExecutor(); + } + } + + private void destroyExecutor() { + myExecutor.shutdownNow(); + } + + private void initializeExecutor() { + int maximumPoolSize = Runtime.getRuntime().availableProcessors(); + + LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(maximumPoolSize); + BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() + .namingPattern("worker-" + "-%d") + .daemon(false) + .priority(Thread.NORM_PRIORITY) + .build(); + RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { + ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size()); + StopWatch sw = new StopWatch(); + try { + executorQueue.put(theRunnable); + } catch (InterruptedException theE) { + throw new RejectedExecutionException("Task " + theRunnable.toString() + + " rejected from " + theE.toString()); + } + ourLog.info("Slot become available after {}ms", sw.getMillis()); + } + }; + myExecutor = new ThreadPoolExecutor( + 1, + maximumPoolSize, + 0L, + TimeUnit.MILLISECONDS, + executorQueue, + threadFactory, + rejectedExecutionHandler); + } + + private Future updateRows(List> theRows) { + Runnable task = () -> { + StopWatch sw = new StopWatch(); + getTxTemplate().execute(t -> { + + // Loop through rows + assert theRows != null; + for (Map nextRow : theRows) { + + Map newValues = new HashMap<>(); + MandatoryKeyMap nextRowMandatoryKeyMap = new MandatoryKeyMap<>(nextRow); + + // Apply calculators + for (Map.Entry, Long>> nextCalculatorEntry : myCalculators.entrySet()) { + String nextColumn = nextCalculatorEntry.getKey(); + Function, Long> nextCalculator = nextCalculatorEntry.getValue(); + Long value = nextCalculator.apply(nextRowMandatoryKeyMap); + newValues.put(nextColumn, value); + } + + // Generate update SQL + StringBuilder sqlBuilder = new StringBuilder(); + List arguments = new ArrayList<>(); + sqlBuilder.append("UPDATE "); + sqlBuilder.append(getTableName()); + sqlBuilder.append(" SET "); + for (Map.Entry nextNewValueEntry : newValues.entrySet()) { + if (arguments.size() > 0) { + sqlBuilder.append(", "); + } + sqlBuilder.append(nextNewValueEntry.getKey()).append(" = ?"); + arguments.add(nextNewValueEntry.getValue()); + } + sqlBuilder.append(" WHERE SP_ID = ?"); + arguments.add((Long) nextRow.get("SP_ID")); + + // Apply update SQL + newJdbcTemnplate().update(sqlBuilder.toString(), arguments.toArray()); + + } + + return theRows.size(); + }); + ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString()); + }; + return myExecutor.submit(task); } public CalculateHashesTask addCalculator(String theColumnName, Function, Long> theConsumer) { @@ -116,6 +194,39 @@ public class CalculateHashesTask extends BaseTableColumnTask> myRows = new ArrayList<>(); + private List> myFutures = new ArrayList<>(); + + @Override + public void processRow(ResultSet rs) throws SQLException { + Map row = new ColumnMapRowMapper().mapRow(rs, 0); + myRows.add(row); + + if (myRows.size() >= myBatchSize) { + submitNext(); + } + } + + private void submitNext() { + if (myRows.size() > 0) { + myFutures.add(updateRows(myRows)); + myRows = new ArrayList<>(); + } + } + + public List> getFutures() { + return myFutures; + } + + public void done() { + if (myRows.size() > 0) { + submitNext(); + } + } + } + public static class MandatoryKeyMap extends ForwardingMap { diff --git a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java index acaf3be1c6a..c02246a3a14 100644 --- a/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java +++ b/hapi-fhir-jpaserver-migrate/src/main/java/ca/uhn/fhir/jpa/migrate/tasks/HapiFhirJpaMigrationTasks.java @@ -277,6 +277,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { Builder.BuilderWithTableName spp = version.onTable("HFJ_RES_PARAM_PRESENT"); version.startSectionWithMessage("Starting work on table: " + spp.getTableName()); spp.dropIndex("IDX_RESPARMPRESENT_SPID_RESID"); + spp.dropColumn("SP_ID"); spp .addColumn("HASH_PRESENCE") .nullable() diff --git a/hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CreateHashesTest.java b/hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTest.java similarity index 56% rename from hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CreateHashesTest.java rename to hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTest.java index a5140a72b83..6e15f8734be 100644 --- a/hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CreateHashesTest.java +++ b/hapi-fhir-jpaserver-migrate/src/test/java/ca/uhn/fhir/jpa/migrate/taskdef/CalculateHashesTest.java @@ -9,7 +9,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -public class CreateHashesTest extends BaseTest { +public class CalculateHashesTest extends BaseTest { @Test public void testCreateHashes() { @@ -50,4 +50,36 @@ public class CreateHashesTest extends BaseTest { }); } + @Test + public void testCreateHashesLargeNumber() { + executeSql("create table HFJ_SPIDX_TOKEN (SP_ID bigint not null, SP_MISSING boolean, SP_NAME varchar(100) not null, RES_ID bigint, RES_TYPE varchar(255) not null, SP_UPDATED timestamp, HASH_IDENTITY bigint, HASH_SYS bigint, HASH_SYS_AND_VALUE bigint, HASH_VALUE bigint, SP_SYSTEM varchar(200), SP_VALUE varchar(200), primary key (SP_ID))"); + + for (int i = 0; i < 777; i++) { + executeSql("insert into HFJ_SPIDX_TOKEN (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_SYSTEM, SP_VALUE, SP_ID) values (false, 'identifier', 999, 'Patient', '2018-09-03 07:44:49.196', 'urn:oid:1.2.410.100110.10.41308301', '8888888" + i + "', " + i + ")"); + } + + Long count = getConnectionProperties().getTxTemplate().execute(t -> { + JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate(); + return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class); + }); + assertEquals(777L, count.longValue()); + + CalculateHashesTask task = new CalculateHashesTask(); + task.setTableName("HFJ_SPIDX_TOKEN"); + task.setColumnName("HASH_IDENTITY"); + task.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME"))); + task.addCalculator("HASH_SYS", t -> ResourceIndexedSearchParamToken.calculateHashSystem(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"))); + task.addCalculator("HASH_SYS_AND_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashSystemAndValue(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"), t.getString("SP_VALUE"))); + task.addCalculator("HASH_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashValue(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE"))); + task.setBatchSize(3); + getMigrator().addTask(task); + + getMigrator().migrate(); + + count = getConnectionProperties().getTxTemplate().execute(t -> { + JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate(); + return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class); + }); + assertEquals(0L, count.longValue()); + } } diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/BaseResourceReturningMethodBinding.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/BaseResourceReturningMethodBinding.java index 3de8c16a612..bd5c7f3a90c 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/BaseResourceReturningMethodBinding.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/BaseResourceReturningMethodBinding.java @@ -215,16 +215,24 @@ public abstract class BaseResourceReturningMethodBinding extends BaseMethodBindi linkPrev = RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, theResult.getPreviousPageId(), theRequest.getParameters(), prettyPrint, theBundleType); } } else if (searchId != null) { - int offset = theOffset + resourceList.size(); - // We're doing offset pages - if (numTotalResults == null || offset < numTotalResults) { - linkNext = (RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, offset, numToReturn, theRequest.getParameters(), prettyPrint, theBundleType)); + if (numTotalResults == null || theOffset + numToReturn < numTotalResults) { + linkNext = (RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, theOffset + numToReturn, numToReturn, theRequest.getParameters(), prettyPrint, theBundleType)); } if (theOffset > 0) { int start = Math.max(0, theOffset - theLimit); linkPrev = RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, start, theLimit, theRequest.getParameters(), prettyPrint, theBundleType); } +// int offset = theOffset + resourceList.size(); +// +// // We're doing offset pages +// if (numTotalResults == null || offset < numTotalResults) { +// linkNext = (RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, offset, numToReturn, theRequest.getParameters(), prettyPrint, theBundleType)); +// } +// if (theOffset > 0) { +// int start = Math.max(0, theOffset - theLimit); +// linkPrev = RestfulServerUtils.createPagingLink(theIncludes, serverBase, searchId, start, theLimit, theRequest.getParameters(), prettyPrint, theBundleType); +// } } bundleFactory.addRootPropertiesToBundle(theResult.getUuid(), serverBase, theLinkSelf, linkPrev, linkNext, theResult.size(), theBundleType, theResult.getPublished()); diff --git a/pom.xml b/pom.xml index 50cc673a3af..d5ff60b05fb 100644 --- a/pom.xml +++ b/pom.xml @@ -1727,6 +1727,12 @@ + + + + + + @@ -2131,6 +2137,8 @@ hapi-fhir-structures-dstu2 hapi-fhir-structures-dstu3 hapi-fhir-structures-r4 + hapi-fhir-client + hapi-fhir-server hapi-fhir-jpaserver-base hapi-fhir-jaxrsserver-base diff --git a/src/changes/changes.xml b/src/changes/changes.xml index c1562231fa3..3f2c12cf3d6 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -142,6 +142,10 @@ to use a parameter annotated with @ResourceParam to receive the Parameters (or other) resource supplied by the client as the request body. + + The JPA server version migrator tool now runs in a multithreaded way, allowing it to + upgrade th database faster when migration tasks require data updates. + diff --git a/src/site/site.xml b/src/site/site.xml index 2c5ca40def0..d0e75165cc7 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -115,6 +115,8 @@ + + diff --git a/src/site/xdoc/docindex.xml b/src/site/xdoc/docindex.xml index 49cefb6481c..226d7e8b457 100644 --- a/src/site/xdoc/docindex.xml +++ b/src/site/xdoc/docindex.xml @@ -1,86 +1,88 @@ - - - - - Documentation - James Agnew - - - - -

- -

- Welcome to HAPI FHIR! We hope that the documentation here will be - helpful to you. -

- - - -

The Data Model

- - -

RESTful Client

- - -

RESTful Server

- - -

Other Features

- - - -

JavaDocs

- - -

Source Cross Reference

- - -
- - - - + + + + + Documentation + James Agnew + + + + +
+ +

+ Welcome to HAPI FHIR! We hope that the documentation here will be + helpful to you. +

+ + + +

The Data Model

+ + +

RESTful Client

+ + +

RESTful Server

+ + +

Other Features

+ + + +

JavaDocs

+ + +

Source Cross Reference

+ + +
+ + + +