Merge branch 'philips-3.6.0' of github.com:jamesagnew/hapi-fhir into philips-3.6.0

This commit is contained in:
James Agnew 2018-10-31 17:19:28 -04:00
commit c8834cd29a
11 changed files with 325 additions and 153 deletions

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.rest.api; package ca.uhn.fhir.rest.api;
/*-
* #%L
* HAPI FHIR - Core Library
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;

View File

@ -31,6 +31,10 @@
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId> <artifactId>spring-jdbc</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<!-- This dependency includes the core HAPI-FHIR classes --> <!-- This dependency includes the core HAPI-FHIR classes -->
<dependency> <dependency>
@ -45,11 +49,6 @@
<artifactId>derby</artifactId> <artifactId>derby</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.migrate; package ca.uhn.fhir.jpa.migrate;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,20 +77,13 @@ public enum DriverTypeEnum {
throw new InternalErrorException("Unable to find driver class: " + myDriverClassName, e); throw new InternalErrorException("Unable to find driver class: " + myDriverClassName, e);
} }
SingleConnectionDataSource dataSource = new SingleConnectionDataSource(){ BasicDataSource dataSource = new BasicDataSource();
@Override // dataSource.setAutoCommit(false);
protected Connection getConnectionFromDriver(Properties props) throws SQLException {
Connection connect = driver.connect(theUrl, props);
assert connect != null;
return connect;
}
};
dataSource.setAutoCommit(false);
dataSource.setDriverClassName(myDriverClassName); dataSource.setDriverClassName(myDriverClassName);
dataSource.setUrl(theUrl); dataSource.setUrl(theUrl);
dataSource.setUsername(theUsername); dataSource.setUsername(theUsername);
dataSource.setPassword(thePassword); dataSource.setPassword(thePassword);
dataSource.setSuppressClose(true); // dataSource.setSuppressClose(true);
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(); DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource); transactionManager.setDataSource(dataSource);

View File

@ -23,15 +23,21 @@ package ca.uhn.fhir.jpa.migrate.taskdef;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.ForwardingMap; import com.google.common.collect.ForwardingMap;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function; import java.util.function.Function;
public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask> { public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask> {
@ -39,33 +45,103 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
private static final Logger ourLog = LoggerFactory.getLogger(CalculateHashesTask.class); private static final Logger ourLog = LoggerFactory.getLogger(CalculateHashesTask.class);
private int myBatchSize = 10000; private int myBatchSize = 10000;
private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>(); private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>();
private ThreadPoolExecutor myExecutor;
public void setBatchSize(int theBatchSize) { public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize; myBatchSize = theBatchSize;
} }
/**
* Constructor
*/
public CalculateHashesTask() {
super();
}
@Override @Override
public void execute() { public synchronized void execute() throws SQLException {
if (isDryRun()) { if (isDryRun()) {
return; return;
} }
List<Map<String, Object>> rows; initializeExecutor();
do { try {
rows = getTxTemplate().execute(t -> {
while(true) {
MyRowCallbackHandler rch = new MyRowCallbackHandler();
getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = newJdbcTemnplate(); JdbcTemplate jdbcTemplate = newJdbcTemnplate();
jdbcTemplate.setMaxRows(myBatchSize); jdbcTemplate.setMaxRows(100000);
String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL"; String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL";
ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName()); ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName());
return jdbcTemplate.queryForList(sql);
jdbcTemplate.query(sql, rch);
rch.done();
return null;
}); });
updateRows(rows); rch.submitNext();
} while (rows.size() > 0); List<Future<?>> futures = rch.getFutures();
if (futures.isEmpty()) {
break;
} }
private void updateRows(List<Map<String, Object>> theRows) { ourLog.info("Waiting for {} tasks to complete", futures.size());
for (Future<?> next : futures) {
try {
next.get();
} catch (Exception e) {
throw new SQLException(e);
}
}
}
} finally {
destroyExecutor();
}
}
private void destroyExecutor() {
myExecutor.shutdownNow();
}
private void initializeExecutor() {
int maximumPoolSize = Runtime.getRuntime().availableProcessors();
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(maximumPoolSize);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("worker-" + "-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
myExecutor = new ThreadPoolExecutor(
1,
maximumPoolSize,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
private Future<?> updateRows(List<Map<String, Object>> theRows) {
Runnable task = () -> {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
getTxTemplate().execute(t -> { getTxTemplate().execute(t -> {
@ -108,6 +184,8 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
return theRows.size(); return theRows.size();
}); });
ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString()); ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString());
};
return myExecutor.submit(task);
} }
public CalculateHashesTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) { public CalculateHashesTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) {
@ -116,6 +194,39 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
return this; return this;
} }
private class MyRowCallbackHandler implements RowCallbackHandler {
private List<Map<String, Object>> myRows = new ArrayList<>();
private List<Future<?>> myFutures = new ArrayList<>();
@Override
public void processRow(ResultSet rs) throws SQLException {
Map<String, Object> row = new ColumnMapRowMapper().mapRow(rs, 0);
myRows.add(row);
if (myRows.size() >= myBatchSize) {
submitNext();
}
}
private void submitNext() {
if (myRows.size() > 0) {
myFutures.add(updateRows(myRows));
myRows = new ArrayList<>();
}
}
public List<Future<?>> getFutures() {
return myFutures;
}
public void done() {
if (myRows.size() > 0) {
submitNext();
}
}
}
public static class MandatoryKeyMap<K, V> extends ForwardingMap<K, V> { public static class MandatoryKeyMap<K, V> extends ForwardingMap<K, V> {

View File

@ -9,7 +9,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class CreateHashesTest extends BaseTest { public class CalculateHashesTest extends BaseTest {
@Test @Test
public void testCreateHashes() { public void testCreateHashes() {
@ -50,4 +50,36 @@ public class CreateHashesTest extends BaseTest {
}); });
} }
@Test
public void testCreateHashesLargeNumber() {
executeSql("create table HFJ_SPIDX_TOKEN (SP_ID bigint not null, SP_MISSING boolean, SP_NAME varchar(100) not null, RES_ID bigint, RES_TYPE varchar(255) not null, SP_UPDATED timestamp, HASH_IDENTITY bigint, HASH_SYS bigint, HASH_SYS_AND_VALUE bigint, HASH_VALUE bigint, SP_SYSTEM varchar(200), SP_VALUE varchar(200), primary key (SP_ID))");
for (int i = 0; i < 777; i++) {
executeSql("insert into HFJ_SPIDX_TOKEN (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_SYSTEM, SP_VALUE, SP_ID) values (false, 'identifier', 999, 'Patient', '2018-09-03 07:44:49.196', 'urn:oid:1.2.410.100110.10.41308301', '8888888" + i + "', " + i + ")");
}
Long count = getConnectionProperties().getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate();
return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class);
});
assertEquals(777L, count.longValue());
CalculateHashesTask task = new CalculateHashesTask();
task.setTableName("HFJ_SPIDX_TOKEN");
task.setColumnName("HASH_IDENTITY");
task.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")));
task.addCalculator("HASH_SYS", t -> ResourceIndexedSearchParamToken.calculateHashSystem(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM")));
task.addCalculator("HASH_SYS_AND_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashSystemAndValue(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"), t.getString("SP_VALUE")));
task.addCalculator("HASH_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashValue(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE")));
task.setBatchSize(3);
getMigrator().addTask(task);
getMigrator().migrate();
count = getConnectionProperties().getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate();
return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class);
});
assertEquals(0L, count.longValue());
}
} }

View File

@ -1727,6 +1727,12 @@
<copy todir="target/site/apidocs-jpaserver"> <copy todir="target/site/apidocs-jpaserver">
<fileset dir="hapi-fhir-jpaserver-base/target/site/apidocs"/> <fileset dir="hapi-fhir-jpaserver-base/target/site/apidocs"/>
</copy> </copy>
<copy todir="target/site/apidocs-client">
<fileset dir="hapi-fhir-client/target/site/apidocs"/>
</copy>
<copy todir="target/site/apidocs-server">
<fileset dir="hapi-fhir-server/target/site/apidocs"/>
</copy>
<copy todir="target/site/xref-jpaserver"> <copy todir="target/site/xref-jpaserver">
<fileset dir="hapi-fhir-jpaserver-base/target/site/xref"/> <fileset dir="hapi-fhir-jpaserver-base/target/site/xref"/>
</copy> </copy>
@ -2131,6 +2137,8 @@
<module>hapi-fhir-structures-dstu2</module> <module>hapi-fhir-structures-dstu2</module>
<module>hapi-fhir-structures-dstu3</module> <module>hapi-fhir-structures-dstu3</module>
<module>hapi-fhir-structures-r4</module> <module>hapi-fhir-structures-r4</module>
<module>hapi-fhir-client</module>
<module>hapi-fhir-server</module>
<module>hapi-fhir-jpaserver-base</module> <module>hapi-fhir-jpaserver-base</module>
<module>hapi-fhir-jaxrsserver-base</module> <module>hapi-fhir-jaxrsserver-base</module>
<!-- <module>hapi-fhir-cobertura</module> --> <!-- <module>hapi-fhir-cobertura</module> -->

View File

@ -100,6 +100,10 @@
permission is granted. This has been corrected so that transaction() allows both permission is granted. This has been corrected so that transaction() allows both
batch and transaction requests to proceed. batch and transaction requests to proceed.
</action> </action>
<action type="add">
The JPA server version migrator tool now runs in a multithreaded way, allowing it to
upgrade th database faster when migration tasks require data updates.
</action>
</release> </release>
<release version="3.5.0" date="2018-09-17"> <release version="3.5.0" date="2018-09-17">

View File

@ -115,6 +115,8 @@
<item name="Model API (DSTU2)" href="./apidocs-dstu2/index.html" /> <item name="Model API (DSTU2)" href="./apidocs-dstu2/index.html" />
<item name="Model API (DSTU3)" href="./apidocs-dstu3/index.html" /> <item name="Model API (DSTU3)" href="./apidocs-dstu3/index.html" />
<item name="Model API (R4)" href="./apidocs-r4/index.html" /> <item name="Model API (R4)" href="./apidocs-r4/index.html" />
<item name="Client API" href="./apidocs-client/index.html" />
<item name="Server API" href="./apidocs-server/index.html" />
<item name="JPA Server API" href="./apidocs-jpaserver/index.html" /> <item name="JPA Server API" href="./apidocs-jpaserver/index.html" />
</item> </item>
<item name="Command Line Tool (hapi-fhir-cli)" href="./doc_cli.html" /> <item name="Command Line Tool (hapi-fhir-cli)" href="./doc_cli.html" />

View File

@ -70,6 +70,8 @@
<li><a href="./apidocs-dstu/index.html">Model API (DSTU1)</a></li> <li><a href="./apidocs-dstu/index.html">Model API (DSTU1)</a></li>
<li><a href="./apidocs-dstu2/index.html" >Model API (DSTU2)</a></li> <li><a href="./apidocs-dstu2/index.html" >Model API (DSTU2)</a></li>
<li><a href="./apidocs-dstu3/index.html" >Model API (STU3)</a></li> <li><a href="./apidocs-dstu3/index.html" >Model API (STU3)</a></li>
<li><a href="./apidocs-client/index.html" >Client API</a></li>
<li><a href="./apidocs-server/index.html" >Server API</a></li>
<li><a href="./apidocs-jpaserver/index.html">JPA Server API</a></li> <li><a href="./apidocs-jpaserver/index.html">JPA Server API</a></li>
</ul> </ul>