Add threading to migrator
This commit is contained in:
parent
e6a62a10a2
commit
b2479e4a7b
|
@ -1,5 +1,25 @@
|
||||||
package ca.uhn.fhir.rest.api;
|
package ca.uhn.fhir.rest.api;
|
||||||
|
|
||||||
|
/*-
|
||||||
|
* #%L
|
||||||
|
* HAPI FHIR - Core Library
|
||||||
|
* %%
|
||||||
|
* Copyright (C) 2014 - 2018 University Health Network
|
||||||
|
* %%
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* #L%
|
||||||
|
*/
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,10 @@
|
||||||
<groupId>org.springframework</groupId>
|
<groupId>org.springframework</groupId>
|
||||||
<artifactId>spring-jdbc</artifactId>
|
<artifactId>spring-jdbc</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-dbcp2</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- This dependency includes the core HAPI-FHIR classes -->
|
<!-- This dependency includes the core HAPI-FHIR classes -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -45,11 +49,6 @@
|
||||||
<artifactId>derby</artifactId>
|
<artifactId>derby</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.commons</groupId>
|
|
||||||
<artifactId>commons-dbcp2</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package ca.uhn.fhir.jpa.migrate;
|
package ca.uhn.fhir.jpa.migrate;
|
||||||
|
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
|
import org.apache.commons.dbcp2.BasicDataSource;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -76,20 +77,13 @@ public enum DriverTypeEnum {
|
||||||
throw new InternalErrorException("Unable to find driver class: " + myDriverClassName, e);
|
throw new InternalErrorException("Unable to find driver class: " + myDriverClassName, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
SingleConnectionDataSource dataSource = new SingleConnectionDataSource(){
|
BasicDataSource dataSource = new BasicDataSource();
|
||||||
@Override
|
// dataSource.setAutoCommit(false);
|
||||||
protected Connection getConnectionFromDriver(Properties props) throws SQLException {
|
|
||||||
Connection connect = driver.connect(theUrl, props);
|
|
||||||
assert connect != null;
|
|
||||||
return connect;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
dataSource.setAutoCommit(false);
|
|
||||||
dataSource.setDriverClassName(myDriverClassName);
|
dataSource.setDriverClassName(myDriverClassName);
|
||||||
dataSource.setUrl(theUrl);
|
dataSource.setUrl(theUrl);
|
||||||
dataSource.setUsername(theUsername);
|
dataSource.setUsername(theUsername);
|
||||||
dataSource.setPassword(thePassword);
|
dataSource.setPassword(thePassword);
|
||||||
dataSource.setSuppressClose(true);
|
// dataSource.setSuppressClose(true);
|
||||||
|
|
||||||
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
|
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
|
||||||
transactionManager.setDataSource(dataSource);
|
transactionManager.setDataSource(dataSource);
|
||||||
|
|
|
@ -23,15 +23,21 @@ package ca.uhn.fhir.jpa.migrate.taskdef;
|
||||||
import ca.uhn.fhir.util.StopWatch;
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import com.google.common.collect.ForwardingMap;
|
import com.google.common.collect.ForwardingMap;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
|
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.jdbc.core.ColumnMapRowMapper;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.jdbc.core.RowCallbackHandler;
|
||||||
|
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.*;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask> {
|
public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask> {
|
||||||
|
@ -39,33 +45,103 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(CalculateHashesTask.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(CalculateHashesTask.class);
|
||||||
private int myBatchSize = 10000;
|
private int myBatchSize = 10000;
|
||||||
private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>();
|
private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>();
|
||||||
|
private ThreadPoolExecutor myExecutor;
|
||||||
|
|
||||||
public void setBatchSize(int theBatchSize) {
|
public void setBatchSize(int theBatchSize) {
|
||||||
myBatchSize = theBatchSize;
|
myBatchSize = theBatchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public CalculateHashesTask() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute() {
|
public synchronized void execute() throws SQLException {
|
||||||
if (isDryRun()) {
|
if (isDryRun()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Map<String, Object>> rows;
|
initializeExecutor();
|
||||||
do {
|
try {
|
||||||
rows = getTxTemplate().execute(t -> {
|
|
||||||
|
while(true) {
|
||||||
|
MyRowCallbackHandler rch = new MyRowCallbackHandler();
|
||||||
|
getTxTemplate().execute(t -> {
|
||||||
JdbcTemplate jdbcTemplate = newJdbcTemnplate();
|
JdbcTemplate jdbcTemplate = newJdbcTemnplate();
|
||||||
jdbcTemplate.setMaxRows(myBatchSize);
|
jdbcTemplate.setMaxRows(100000);
|
||||||
String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL";
|
String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL";
|
||||||
ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName());
|
ourLog.info("Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName());
|
||||||
return jdbcTemplate.queryForList(sql);
|
|
||||||
|
jdbcTemplate.query(sql, rch);
|
||||||
|
rch.done();
|
||||||
|
|
||||||
|
return null;
|
||||||
});
|
});
|
||||||
|
|
||||||
updateRows(rows);
|
rch.submitNext();
|
||||||
} while (rows.size() > 0);
|
List<Future<?>> futures = rch.getFutures();
|
||||||
|
if (futures.isEmpty()) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateRows(List<Map<String, Object>> theRows) {
|
ourLog.info("Waiting for {} tasks to complete", futures.size());
|
||||||
|
for (Future<?> next : futures) {
|
||||||
|
try {
|
||||||
|
next.get();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SQLException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
destroyExecutor();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void destroyExecutor() {
|
||||||
|
myExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeExecutor() {
|
||||||
|
int maximumPoolSize = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
|
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(maximumPoolSize);
|
||||||
|
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||||
|
.namingPattern("worker-" + "-%d")
|
||||||
|
.daemon(false)
|
||||||
|
.priority(Thread.NORM_PRIORITY)
|
||||||
|
.build();
|
||||||
|
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
|
||||||
|
@Override
|
||||||
|
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
|
||||||
|
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
try {
|
||||||
|
executorQueue.put(theRunnable);
|
||||||
|
} catch (InterruptedException theE) {
|
||||||
|
throw new RejectedExecutionException("Task " + theRunnable.toString() +
|
||||||
|
" rejected from " + theE.toString());
|
||||||
|
}
|
||||||
|
ourLog.info("Slot become available after {}ms", sw.getMillis());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
myExecutor = new ThreadPoolExecutor(
|
||||||
|
1,
|
||||||
|
maximumPoolSize,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
executorQueue,
|
||||||
|
threadFactory,
|
||||||
|
rejectedExecutionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<?> updateRows(List<Map<String, Object>> theRows) {
|
||||||
|
Runnable task = () -> {
|
||||||
StopWatch sw = new StopWatch();
|
StopWatch sw = new StopWatch();
|
||||||
getTxTemplate().execute(t -> {
|
getTxTemplate().execute(t -> {
|
||||||
|
|
||||||
|
@ -108,6 +184,8 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
|
||||||
return theRows.size();
|
return theRows.size();
|
||||||
});
|
});
|
||||||
ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString());
|
ourLog.info("Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString());
|
||||||
|
};
|
||||||
|
return myExecutor.submit(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CalculateHashesTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) {
|
public CalculateHashesTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) {
|
||||||
|
@ -116,6 +194,39 @@ public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class MyRowCallbackHandler implements RowCallbackHandler {
|
||||||
|
|
||||||
|
private List<Map<String, Object>> myRows = new ArrayList<>();
|
||||||
|
private List<Future<?>> myFutures = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processRow(ResultSet rs) throws SQLException {
|
||||||
|
Map<String, Object> row = new ColumnMapRowMapper().mapRow(rs, 0);
|
||||||
|
myRows.add(row);
|
||||||
|
|
||||||
|
if (myRows.size() >= myBatchSize) {
|
||||||
|
submitNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void submitNext() {
|
||||||
|
if (myRows.size() > 0) {
|
||||||
|
myFutures.add(updateRows(myRows));
|
||||||
|
myRows = new ArrayList<>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Future<?>> getFutures() {
|
||||||
|
return myFutures;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void done() {
|
||||||
|
if (myRows.size() > 0) {
|
||||||
|
submitNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class MandatoryKeyMap<K, V> extends ForwardingMap<K, V> {
|
public static class MandatoryKeyMap<K, V> extends ForwardingMap<K, V> {
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class CreateHashesTest extends BaseTest {
|
public class CalculateHashesTest extends BaseTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateHashes() {
|
public void testCreateHashes() {
|
||||||
|
@ -50,4 +50,36 @@ public class CreateHashesTest extends BaseTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateHashesLargeNumber() {
|
||||||
|
executeSql("create table HFJ_SPIDX_TOKEN (SP_ID bigint not null, SP_MISSING boolean, SP_NAME varchar(100) not null, RES_ID bigint, RES_TYPE varchar(255) not null, SP_UPDATED timestamp, HASH_IDENTITY bigint, HASH_SYS bigint, HASH_SYS_AND_VALUE bigint, HASH_VALUE bigint, SP_SYSTEM varchar(200), SP_VALUE varchar(200), primary key (SP_ID))");
|
||||||
|
|
||||||
|
for (int i = 0; i < 777; i++) {
|
||||||
|
executeSql("insert into HFJ_SPIDX_TOKEN (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_SYSTEM, SP_VALUE, SP_ID) values (false, 'identifier', 999, 'Patient', '2018-09-03 07:44:49.196', 'urn:oid:1.2.410.100110.10.41308301', '8888888" + i + "', " + i + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
Long count = getConnectionProperties().getTxTemplate().execute(t -> {
|
||||||
|
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate();
|
||||||
|
return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class);
|
||||||
|
});
|
||||||
|
assertEquals(777L, count.longValue());
|
||||||
|
|
||||||
|
CalculateHashesTask task = new CalculateHashesTask();
|
||||||
|
task.setTableName("HFJ_SPIDX_TOKEN");
|
||||||
|
task.setColumnName("HASH_IDENTITY");
|
||||||
|
task.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")));
|
||||||
|
task.addCalculator("HASH_SYS", t -> ResourceIndexedSearchParamToken.calculateHashSystem(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM")));
|
||||||
|
task.addCalculator("HASH_SYS_AND_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashSystemAndValue(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"), t.getString("SP_VALUE")));
|
||||||
|
task.addCalculator("HASH_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashValue(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE")));
|
||||||
|
task.setBatchSize(3);
|
||||||
|
getMigrator().addTask(task);
|
||||||
|
|
||||||
|
getMigrator().migrate();
|
||||||
|
|
||||||
|
count = getConnectionProperties().getTxTemplate().execute(t -> {
|
||||||
|
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate();
|
||||||
|
return jdbcTemplate.queryForObject("SELECT count(*) FROM HFJ_SPIDX_TOKEN WHERE HASH_VALUE IS NULL", Long.class);
|
||||||
|
});
|
||||||
|
assertEquals(0L, count.longValue());
|
||||||
|
}
|
||||||
}
|
}
|
8
pom.xml
8
pom.xml
|
@ -1727,6 +1727,12 @@
|
||||||
<copy todir="target/site/apidocs-jpaserver">
|
<copy todir="target/site/apidocs-jpaserver">
|
||||||
<fileset dir="hapi-fhir-jpaserver-base/target/site/apidocs"/>
|
<fileset dir="hapi-fhir-jpaserver-base/target/site/apidocs"/>
|
||||||
</copy>
|
</copy>
|
||||||
|
<copy todir="target/site/apidocs-client">
|
||||||
|
<fileset dir="hapi-fhir-client/target/site/apidocs"/>
|
||||||
|
</copy>
|
||||||
|
<copy todir="target/site/apidocs-server">
|
||||||
|
<fileset dir="hapi-fhir-server/target/site/apidocs"/>
|
||||||
|
</copy>
|
||||||
<copy todir="target/site/xref-jpaserver">
|
<copy todir="target/site/xref-jpaserver">
|
||||||
<fileset dir="hapi-fhir-jpaserver-base/target/site/xref"/>
|
<fileset dir="hapi-fhir-jpaserver-base/target/site/xref"/>
|
||||||
</copy>
|
</copy>
|
||||||
|
@ -2131,6 +2137,8 @@
|
||||||
<module>hapi-fhir-structures-dstu2</module>
|
<module>hapi-fhir-structures-dstu2</module>
|
||||||
<module>hapi-fhir-structures-dstu3</module>
|
<module>hapi-fhir-structures-dstu3</module>
|
||||||
<module>hapi-fhir-structures-r4</module>
|
<module>hapi-fhir-structures-r4</module>
|
||||||
|
<module>hapi-fhir-client</module>
|
||||||
|
<module>hapi-fhir-server</module>
|
||||||
<module>hapi-fhir-jpaserver-base</module>
|
<module>hapi-fhir-jpaserver-base</module>
|
||||||
<module>hapi-fhir-jaxrsserver-base</module>
|
<module>hapi-fhir-jaxrsserver-base</module>
|
||||||
<!-- <module>hapi-fhir-cobertura</module> -->
|
<!-- <module>hapi-fhir-cobertura</module> -->
|
||||||
|
|
|
@ -115,6 +115,8 @@
|
||||||
<item name="Model API (DSTU2)" href="./apidocs-dstu2/index.html" />
|
<item name="Model API (DSTU2)" href="./apidocs-dstu2/index.html" />
|
||||||
<item name="Model API (DSTU3)" href="./apidocs-dstu3/index.html" />
|
<item name="Model API (DSTU3)" href="./apidocs-dstu3/index.html" />
|
||||||
<item name="Model API (R4)" href="./apidocs-r4/index.html" />
|
<item name="Model API (R4)" href="./apidocs-r4/index.html" />
|
||||||
|
<item name="Client API" href="./apidocs-client/index.html" />
|
||||||
|
<item name="Server API" href="./apidocs-server/index.html" />
|
||||||
<item name="JPA Server API" href="./apidocs-jpaserver/index.html" />
|
<item name="JPA Server API" href="./apidocs-jpaserver/index.html" />
|
||||||
</item>
|
</item>
|
||||||
<item name="Command Line Tool (hapi-fhir-cli)" href="./doc_cli.html" />
|
<item name="Command Line Tool (hapi-fhir-cli)" href="./doc_cli.html" />
|
||||||
|
|
|
@ -70,6 +70,8 @@
|
||||||
<li><a href="./apidocs-dstu/index.html">Model API (DSTU1)</a></li>
|
<li><a href="./apidocs-dstu/index.html">Model API (DSTU1)</a></li>
|
||||||
<li><a href="./apidocs-dstu2/index.html" >Model API (DSTU2)</a></li>
|
<li><a href="./apidocs-dstu2/index.html" >Model API (DSTU2)</a></li>
|
||||||
<li><a href="./apidocs-dstu3/index.html" >Model API (STU3)</a></li>
|
<li><a href="./apidocs-dstu3/index.html" >Model API (STU3)</a></li>
|
||||||
|
<li><a href="./apidocs-client/index.html" >Client API</a></li>
|
||||||
|
<li><a href="./apidocs-server/index.html" >Server API</a></li>
|
||||||
<li><a href="./apidocs-jpaserver/index.html">JPA Server API</a></li>
|
<li><a href="./apidocs-jpaserver/index.html">JPA Server API</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue