Extract Common functionality out of CalculateHashesTask, build CalculateOrdinalDatesTask based on it

This commit is contained in:
Gary Graham 2020-02-25 16:09:08 -05:00
parent 512af5b87d
commit 97c98254d0
8 changed files with 86 additions and 252 deletions

View File

@ -177,8 +177,6 @@ public final class DateUtils {
Calendar cal = org.apache.commons.lang3.time.DateUtils.toCalendar(theDateValue);
SimpleDateFormat format = new SimpleDateFormat(PATTERN_INTEGER_DATE);
String theDateString = format.format(theDateValue);
String s = String.valueOf(cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) + cal.get(Calendar.DAY_OF_MONTH);
return Integer.parseInt(theDateString);
}

View File

@ -2,4 +2,4 @@
type: fix
issue: 1499
title: When performing a search with a DateParam that has DAY precision, rely on new ordinal date field for comparison
instead of attempting to find oldest and newest instant that could be valid.
instead of attempting to find oldest and newest instant that could be valid.

View File

@ -38,11 +38,11 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdinalDateTask> {
public abstract class BaseColumnCalculatorTask extends BaseTableColumnTask<BaseColumnCalculatorTask> {
private static final Logger ourLog = LoggerFactory.getLogger(CalculateOrdinalDateTask.class);
protected static final Logger ourLog = LoggerFactory.getLogger(BaseColumnCalculatorTask.class);
private int myBatchSize = 10000;
private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>();
private Map<String, Function<MandatoryKeyMap<String, Object>, Object>> myCalculators = new HashMap<>();
private ThreadPoolExecutor myExecutor;
public void setBatchSize(int theBatchSize) {
@ -52,19 +52,24 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
/**
* Constructor
*/
public CalculateOrdinalDateTask(VersionEnum theRelease, String theVersion) {
public BaseColumnCalculatorTask(VersionEnum theRelease, String theVersion) {
super(theRelease.toString(), theVersion);
setDescription("Calculate resource search parameter index hashes");
}
/**
* Allows concrete implementations to decide if they should be skipped.
* @return a boolean indicating whether or not to skip execution of the task.
*/
protected abstract boolean shouldSkipTask();
@Override
public synchronized void doExecute() throws SQLException {
if (isDryRun()) {
if (isDryRun() || shouldSkipTask()) {
return;
}
Set<String> tableNames = JdbcUtils.getTableNames(getConnectionProperties());
initializeExecutor();
try {
while(true) {
@ -73,7 +78,7 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
JdbcTemplate jdbcTemplate = newJdbcTemplate();
jdbcTemplate.setMaxRows(100000);
String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL";
logInfo(ourLog, "Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName());
logInfo(ourLog, "Finding up to {} rows in {} that requires calculations", myBatchSize, getTableName());
jdbcTemplate.query(sql, rch);
rch.done();
@ -149,24 +154,24 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
assert theRows != null;
for (Map<String, Object> nextRow : theRows) {
Map<String, Long> newValues = new HashMap<>();
Map<String, Object> newValues = new HashMap<>();
MandatoryKeyMap<String, Object> nextRowMandatoryKeyMap = new MandatoryKeyMap<>(nextRow);
// Apply calculators
for (Map.Entry<String, Function<MandatoryKeyMap<String, Object>, Long>> nextCalculatorEntry : myCalculators.entrySet()) {
for (Map.Entry<String, Function<MandatoryKeyMap<String, Object>, Object>> nextCalculatorEntry : myCalculators.entrySet()) {
String nextColumn = nextCalculatorEntry.getKey();
Function<MandatoryKeyMap<String, Object>, Long> nextCalculator = nextCalculatorEntry.getValue();
Long value = nextCalculator.apply(nextRowMandatoryKeyMap);
Function<MandatoryKeyMap<String, Object>, Object> nextCalculator = nextCalculatorEntry.getValue();
Object value = nextCalculator.apply(nextRowMandatoryKeyMap);
newValues.put(nextColumn, value);
}
// Generate update SQL
StringBuilder sqlBuilder = new StringBuilder();
List<Number> arguments = new ArrayList<>();
List<Object> arguments = new ArrayList<>();
sqlBuilder.append("UPDATE ");
sqlBuilder.append(getTableName());
sqlBuilder.append(" SET ");
for (Map.Entry<String, Long> nextNewValueEntry : newValues.entrySet()) {
for (Map.Entry<String, Object> nextNewValueEntry : newValues.entrySet()) {
if (arguments.size() > 0) {
sqlBuilder.append(", ");
}
@ -178,9 +183,7 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
// Apply update SQL
newJdbcTemplate().update(sqlBuilder.toString(), arguments.toArray());
}
return theRows.size();
});
logInfo(ourLog, "Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString());
@ -188,7 +191,7 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
return myExecutor.submit(task);
}
public CalculateOrdinalDateTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) {
public BaseColumnCalculatorTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Object> theConsumer) {
Validate.isTrue(myCalculators.containsKey(theColumnName) == false);
myCalculators.put(theColumnName, theConsumer);
return this;
@ -248,6 +251,10 @@ public class CalculateOrdinalDateTask extends BaseTableColumnTask<CalculateOrdin
return (String) get(theKey);
}
public Date getDate(String theKey) {
return (Date) get(theKey);
}
@Override
protected Map<K, V> delegate() {
return myWrap;

View File

@ -165,7 +165,7 @@ public abstract class BaseTask<T extends BaseTask> {
doExecute();
}
public abstract void doExecute() throws SQLException;
protected abstract void doExecute() throws SQLException;
public void setFailureAllowed(boolean theFailureAllowed) {
myFailureAllowed = theFailureAllowed;

View File

@ -38,234 +38,30 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
public class CalculateHashesTask extends BaseTableColumnTask<CalculateHashesTask> {
private static final Logger ourLog = LoggerFactory.getLogger(CalculateHashesTask.class);
private int myBatchSize = 10000;
private Map<String, Function<MandatoryKeyMap<String, Object>, Long>> myCalculators = new HashMap<>();
private ThreadPoolExecutor myExecutor;
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}
public class CalculateHashesTask extends BaseColumnCalculatorTask {
/**
* Constructor
*/
public CalculateHashesTask(VersionEnum theRelease, String theVersion) {
super(theRelease.toString(), theVersion);
super(theRelease, theVersion);
setDescription("Calculate resource search parameter index hashes");
}
@Override
public synchronized void doExecute() throws SQLException {
if (isDryRun()) {
return;
}
Set<String> tableNames = JdbcUtils.getTableNames(getConnectionProperties());
// This table was added shortly after hash indexes were added, so it is a reasonable indicator for whether this
// migration has already been run
if (tableNames.contains("HFJ_RES_REINDEX_JOB")) {
logInfo(ourLog, "The table HFJ_RES_REINDEX_JOB already exists. Skipping calculate hashes task.");
return;
}
initializeExecutor();
protected boolean shouldSkipTask() {
try {
while(true) {
MyRowCallbackHandler rch = new MyRowCallbackHandler();
getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = newJdbcTemplate();
jdbcTemplate.setMaxRows(100000);
String sql = "SELECT * FROM " + getTableName() + " WHERE " + getColumnName() + " IS NULL";
logInfo(ourLog, "Finding up to {} rows in {} that requires hashes", myBatchSize, getTableName());
jdbcTemplate.query(sql, rch);
rch.done();
return null;
});
rch.submitNext();
List<Future<?>> futures = rch.getFutures();
if (futures.isEmpty()) {
break;
}
logInfo(ourLog, "Waiting for {} tasks to complete", futures.size());
for (Future<?> next : futures) {
try {
next.get();
} catch (Exception e) {
throw new SQLException(e);
}
}
Set<String> tableNames = JdbcUtils.getTableNames(getConnectionProperties());
boolean shouldSkip = tableNames.contains("HFJ_RES_REINDEX_JOB");
// This table was added shortly after hash indexes were added, so it is a reasonable indicator for whether this
// migration has already been run
if (shouldSkip) {
logInfo(ourLog, "The table HFJ_RES_REINDEX_JOB already exists. Skipping calculate hashes task.");
}
} finally {
destroyExecutor();
}
}
private void destroyExecutor() {
myExecutor.shutdownNow();
}
private void initializeExecutor() {
int maximumPoolSize = Runtime.getRuntime().availableProcessors();
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(maximumPoolSize);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("worker-" + "-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
logInfo(ourLog, "Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
logInfo(ourLog, "Slot become available after {}ms", sw.getMillis());
}
};
myExecutor = new ThreadPoolExecutor(
1,
maximumPoolSize,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
private Future<?> updateRows(List<Map<String, Object>> theRows) {
Runnable task = () -> {
StopWatch sw = new StopWatch();
getTxTemplate().execute(t -> {
// Loop through rows
assert theRows != null;
for (Map<String, Object> nextRow : theRows) {
Map<String, Long> newValues = new HashMap<>();
MandatoryKeyMap<String, Object> nextRowMandatoryKeyMap = new MandatoryKeyMap<>(nextRow);
// Apply calculators
for (Map.Entry<String, Function<MandatoryKeyMap<String, Object>, Long>> nextCalculatorEntry : myCalculators.entrySet()) {
String nextColumn = nextCalculatorEntry.getKey();
Function<MandatoryKeyMap<String, Object>, Long> nextCalculator = nextCalculatorEntry.getValue();
Long value = nextCalculator.apply(nextRowMandatoryKeyMap);
newValues.put(nextColumn, value);
}
// Generate update SQL
StringBuilder sqlBuilder = new StringBuilder();
List<Number> arguments = new ArrayList<>();
sqlBuilder.append("UPDATE ");
sqlBuilder.append(getTableName());
sqlBuilder.append(" SET ");
for (Map.Entry<String, Long> nextNewValueEntry : newValues.entrySet()) {
if (arguments.size() > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(nextNewValueEntry.getKey()).append(" = ?");
arguments.add(nextNewValueEntry.getValue());
}
sqlBuilder.append(" WHERE SP_ID = ?");
arguments.add((Number) nextRow.get("SP_ID"));
// Apply update SQL
newJdbcTemplate().update(sqlBuilder.toString(), arguments.toArray());
}
return theRows.size();
});
logInfo(ourLog, "Updated {} rows on {} in {}", theRows.size(), getTableName(), sw.toString());
};
return myExecutor.submit(task);
}
public CalculateHashesTask addCalculator(String theColumnName, Function<MandatoryKeyMap<String, Object>, Long> theConsumer) {
Validate.isTrue(myCalculators.containsKey(theColumnName) == false);
myCalculators.put(theColumnName, theConsumer);
return this;
}
private class MyRowCallbackHandler implements RowCallbackHandler {
private List<Map<String, Object>> myRows = new ArrayList<>();
private List<Future<?>> myFutures = new ArrayList<>();
@Override
public void processRow(ResultSet rs) throws SQLException {
Map<String, Object> row = new ColumnMapRowMapper().mapRow(rs, 0);
myRows.add(row);
if (myRows.size() >= myBatchSize) {
submitNext();
}
}
private void submitNext() {
if (myRows.size() > 0) {
myFutures.add(updateRows(myRows));
myRows = new ArrayList<>();
}
}
public List<Future<?>> getFutures() {
return myFutures;
}
public void done() {
if (myRows.size() > 0) {
submitNext();
}
}
}
public static class MandatoryKeyMap<K, V> extends ForwardingMap<K, V> {
private final Map<K, V> myWrap;
public MandatoryKeyMap(Map<K, V> theWrap) {
myWrap = theWrap;
}
@Override
public V get(Object theKey) {
if (!containsKey(theKey)) {
throw new IllegalArgumentException("No key: " + theKey);
}
return super.get(theKey);
}
public String getString(String theKey) {
return (String) get(theKey);
}
@Override
protected Map<K, V> delegate() {
return myWrap;
}
public String getResourceType() {
return getString("RES_TYPE");
}
public String getParamName() {
return getString("SP_NAME");
return shouldSkip;
} catch (SQLException e) {
logInfo(ourLog, "Error retrieving table names, skipping task");
return true;
}
}
}

View File

@ -0,0 +1,35 @@
package ca.uhn.fhir.jpa.migrate.taskdef;
/*-
* #%L
* HAPI FHIR JPA Server - Migration
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.util.VersionEnum;
public class CalculateOrdinalDatesTask extends BaseColumnCalculatorTask {
public CalculateOrdinalDatesTask(VersionEnum theRelease, String theVersion) {
super(theRelease, theVersion);
setDescription("Calculate SP_LOW_VALUE_DATE and SP_HIGH_VALUE_DATE based on existing SP_LOW and SP_HIGH date values in Date Search Params");
}
@Override
protected boolean shouldSkipTask() {
return false; // TODO Is there a case where we should just not do this?
}
}

View File

@ -21,19 +21,13 @@ package ca.uhn.fhir.jpa.migrate.tasks;
*/
import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;
import ca.uhn.fhir.jpa.migrate.taskdef.AddColumnTask;
import ca.uhn.fhir.jpa.migrate.taskdef.ArbitrarySqlTask;
import ca.uhn.fhir.jpa.migrate.taskdef.BaseTableColumnTypeTask;
import ca.uhn.fhir.jpa.migrate.taskdef.CalculateHashesTask;
import ca.uhn.fhir.jpa.migrate.taskdef.*;
import ca.uhn.fhir.jpa.migrate.tasks.api.BaseMigrationTasks;
import ca.uhn.fhir.jpa.migrate.tasks.api.Builder;
import ca.uhn.fhir.jpa.model.entity.*;
import ca.uhn.fhir.util.VersionEnum;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
@SuppressWarnings({"SqlNoDataSourceInspection", "SpellCheckingInspection"})
@ -74,10 +68,10 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
Builder.BuilderWithTableName spidxDate = version.onTable("HFJ_SPIDX_DATE");
spidxDate.addColumn("20200225.1", "SP_VALUE_LOW_DATE_ORDINAL").nullable().type(BaseTableColumnTypeTask.ColumnTypeEnum.INT);
spidxDate.addColumn("20200225.2", "SP_VALUE_HIGH_DATE_ORDINAL").nullable().type(BaseTableColumnTypeTask.ColumnTypeEnum.INT);
spidxDate.addTask(new CalculateHashesTask(VersionEnum.V4_3_0, "20200225.3")
.setColumnName("HASH_IDENTITY")
.addCalculator("SP_VALUE_LOW_DATE_ORDINAL", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("SP_VALUE_HIGH_DATE_ORDINAL", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
spidxDate.addTask(new CalculateOrdinalDatesTask(VersionEnum.V4_3_0, "20200225.3")
.setColumnName("SP_VALUE_LOW_DATE_ORDINAL") //It doesn't matter which of the two we choose as they will both be null.
.addCalculator("SP_VALUE_LOW_DATE_ORDINAL", t -> ResourceIndexedSearchParamDate.calculateOrdinalValue(t.getDate("SP_VALUE_LOW")))
.addCalculator("SP_VALUE_HIGH_DATE_ORDINAL", t -> ResourceIndexedSearchParamDate.calculateOrdinalValue(t.getDate("SP_VALUE_HIGH")))
);
//
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.DateUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -57,8 +58,7 @@ public class ResourceIndexedSearchParamDate extends BaseResourceIndexedSearchPar
/**
* Field which stores an integer representation of YYYYMDD as calculated by Calendar
* e.g. 2019-01-20 -> 2019020
* (note that the month is 0 since calendar month counting starts at 0.
* e.g. 2019-01-20 -> 20190120
*/
@Column(name="SP_VALUE_LOW_DATE_ORDINAL")
public Integer myValueLowDateOrdinal;
@ -242,4 +242,8 @@ public class ResourceIndexedSearchParamDate extends BaseResourceIndexedSearchPar
return result;
}
public static Long calculateOrdinalValue(Date theDate) {
return (long) DateUtils.convertDatetoDayInteger(theDate);
};
}