Add support for clustered database migrations (#4177)

* initial attempt

* cleanup

* add other databases

* fix test

* fix dangling connection

* the HapiMigratorTest passes when all the tests are run, but not individual tests.  Methinks the locking just isn't working.

* omg it was autoCommit all along ugh!  it works now

* cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* documentation

* javadoc

* cleanup

* final cleanup

* simple solution is working

* fix tests

* final fixmes.  this is actually a merge candidate

* pre-review cleanup

* changelog

* pre-review cleanup

* pre-review cleanup

* fix test

* review feedback

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2022-10-25 11:20:31 -04:00 committed by GitHub
parent 1190eb96ac
commit e7ac40d050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 481 additions and 85 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 4177
title: "Support has been added for clustered database upgrades in the new (non-FlyWay) migrator codebase. Only one database migration is permitted at once."

View File

@ -87,6 +87,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-test-utilities</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,86 @@
package ca.uhn.fhir.jpa.migrate;
/*-
* #%L
* HAPI FHIR Server - SQL Migration
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
/**
* The approach used in this class is borrowed from org.flywaydb.community.database.ignite.thin.IgniteThinDatabase
*/
public class HapiMigrationLock implements AutoCloseable {
private static final Logger ourLog = LoggerFactory.getLogger(HapiMigrationLock.class);
public static final int SLEEP_MILLIS_BETWEEN_LOCK_RETRIES = 1000;
public static final int MAX_RETRY_ATTEMPTS = 50;
private final String myLockDescription = UUID.randomUUID().toString();
private final HapiMigrationStorageSvc myMigrationStorageSvc;
/**
* This constructor should only ever be called from within a try-with-resources so the lock is released when the block is exited
*/
public HapiMigrationLock(HapiMigrationStorageSvc theMigrationStorageSvc) {
myMigrationStorageSvc = theMigrationStorageSvc;
lock();
}
private void lock() {
int retryCount = 0;
do {
try {
if (insertLockingRow()) {
return;
}
retryCount++;
ourLog.info("Waiting for lock on " + this);
Thread.sleep(SLEEP_MILLIS_BETWEEN_LOCK_RETRIES);
} catch (InterruptedException ex) {
// Ignore - if interrupted, we still need to wait for lock to become available
}
} while (retryCount < MAX_RETRY_ATTEMPTS);
throw new HapiMigrationException(Msg.code(2153) + "Unable to obtain table lock - another database migration may be running. If no " +
"other database migration is running, then the previous migration did not shut down properly and the " +
"lock record needs to be deleted manually. The lock record is located in the " + myMigrationStorageSvc.getMigrationTablename() + " table with " +
"INSTALLED_RANK = " + HapiMigrationStorageSvc.LOCK_PID);
}
private boolean insertLockingRow() {
try {
return myMigrationStorageSvc.insertLockRecord(myLockDescription);
} catch (Exception e) {
return false;
}
}
@Override
public void close() {
boolean result = myMigrationStorageSvc.deleteLockRecord(myLockDescription);
if (!result) {
ourLog.error("Failed to delete migration lock record for description = [{}]", myLockDescription);
}
}
}

View File

@ -20,21 +20,30 @@ package ca.uhn.fhir.jpa.migrate;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.migrate.dao.HapiMigrationDao;
import ca.uhn.fhir.jpa.migrate.entity.HapiMigrationEntity;
import ca.uhn.fhir.jpa.migrate.taskdef.BaseTask;
import org.flywaydb.core.api.MigrationVersion;
import java.util.Optional;
import java.util.Set;
public class HapiMigrationStorageSvc {
public static final String UNKNOWN_VERSION = "unknown";
private static final String LOCK_TYPE = "hapi-fhir-lock";
static final Integer LOCK_PID = -100;
private final HapiMigrationDao myHapiMigrationDao;
public HapiMigrationStorageSvc(HapiMigrationDao theHapiMigrationDao) {
myHapiMigrationDao = theHapiMigrationDao;
}
public String getMigrationTablename() {
return myHapiMigrationDao.getMigrationTablename();
}
/**
* Returns a list of migration tasks that have not yet been successfully run against the database
* @param theTaskList the full list of tasks for this release
@ -84,4 +93,37 @@ public class HapiMigrationStorageSvc {
public void createMigrationTableIfRequired() {
myHapiMigrationDao.createMigrationTableIfRequired();
}
/**
*
* @param theLockDescription value of the Description for the lock record
* @return true if the record was successfully deleted
*/
public boolean deleteLockRecord(String theLockDescription) {
verifyNoOtherLocksPresent(theLockDescription);
// Remove the locking row
return myHapiMigrationDao.deleteLockRecord(LOCK_PID, theLockDescription);
}
void verifyNoOtherLocksPresent(String theLockDescription) {
Optional<HapiMigrationEntity> otherLockFound = myHapiMigrationDao.findFirstByPidAndNotDescription(LOCK_PID, theLockDescription);
// Check that there are no other locks in place. This should not happen!
if (otherLockFound.isPresent()) {
throw new HapiMigrationException(Msg.code(2152) + "Internal error: on unlocking, a competing lock was found");
}
}
public boolean insertLockRecord(String theLockDescription) {
HapiMigrationEntity entity = new HapiMigrationEntity();
entity.setPid(LOCK_PID);
entity.setType(LOCK_TYPE);
entity.setDescription(theLockDescription);
entity.setExecutionTime(0);
entity.setSuccess(true);
return myHapiMigrationDao.save(entity);
}
}

View File

@ -42,7 +42,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class HapiMigrator {
private static final Logger ourLog = LoggerFactory.getLogger(HapiMigrator.class);
private MigrationTaskList myTaskList = new MigrationTaskList();
private final MigrationTaskList myTaskList = new MigrationTaskList();
private boolean myDryRun;
private boolean myNoColumnShrink;
private final DriverTypeEnum myDriverType;
@ -101,44 +101,28 @@ public class HapiMigrator {
public MigrationResult migrate() {
ourLog.info("Loaded {} migration tasks", myTaskList.size());
MigrationTaskList newTaskList = myHapiMigrationStorageSvc.diff(myTaskList);
ourLog.info("{} of these {} migration tasks are new. Executing them now.", newTaskList.size(), myTaskList.size());
MigrationResult retval = new MigrationResult();
try (DriverTypeEnum.ConnectionProperties connectionProperties = getDriverType().newConnectionProperties(getDataSource())) {
// Lock the migration table so only one server migrates the database at once
try (HapiMigrationLock ignored = new HapiMigrationLock(myHapiMigrationStorageSvc)) {
MigrationTaskList newTaskList = myHapiMigrationStorageSvc.diff(myTaskList);
ourLog.info("{} of these {} migration tasks are new. Executing them now.", newTaskList.size(), myTaskList.size());
newTaskList.forEach(next -> {
try (DriverTypeEnum.ConnectionProperties connectionProperties = getDriverType().newConnectionProperties(getDataSource())) {
next.setDriverType(getDriverType());
next.setDryRun(isDryRun());
next.setNoColumnShrink(isNoColumnShrink());
next.setConnectionProperties(connectionProperties);
newTaskList.forEach(next -> {
StopWatch sw = new StopWatch();
try {
if (isDryRun()) {
ourLog.info("Dry run {} {}", next.getMigrationVersion(), next.getDescription());
} else {
ourLog.info("Executing {} {}", next.getMigrationVersion(), next.getDescription());
}
preExecute(next);
next.execute();
postExecute(next, sw, true);
retval.changes += next.getChangesCount();
retval.executedStatements.addAll(next.getExecutedStatements());
retval.succeededTasks.add(next);
} catch (SQLException|HapiMigrationException e) {
retval.failedTasks.add(next);
postExecute(next, sw, false);
String description = next.getDescription();
if (isBlank(description)) {
description = next.getClass().getSimpleName();
}
String prefix = "Failure executing task \"" + description + "\", aborting! Cause: ";
throw new HapiMigrationException(Msg.code(47) + prefix + e, retval, e);
}
});
next.setDriverType(getDriverType());
next.setDryRun(isDryRun());
next.setNoColumnShrink(isNoColumnShrink());
next.setConnectionProperties(connectionProperties);
executeTask(next, retval);
});
}
} catch (Exception e) {
ourLog.error("Migration failed", e);
throw e;
}
ourLog.info(retval.summary());
@ -151,6 +135,32 @@ public class HapiMigrator {
return retval;
}
private void executeTask(BaseTask theTask, MigrationResult theMigrationResult) {
StopWatch sw = new StopWatch();
try {
if (isDryRun()) {
ourLog.info("Dry run {} {}", theTask.getMigrationVersion(), theTask.getDescription());
} else {
ourLog.info("Executing {} {}", theTask.getMigrationVersion(), theTask.getDescription());
}
preExecute(theTask);
theTask.execute();
postExecute(theTask, sw, true);
theMigrationResult.changes += theTask.getChangesCount();
theMigrationResult.executedStatements.addAll(theTask.getExecutedStatements());
theMigrationResult.succeededTasks.add(theTask);
} catch (SQLException | HapiMigrationException e) {
theMigrationResult.failedTasks.add(theTask);
postExecute(theTask, sw, false);
String description = theTask.getDescription();
if (isBlank(description)) {
description = theTask.getClass().getSimpleName();
}
String prefix = "Failure executing task \"" + description + "\", aborting! Cause: ";
throw new HapiMigrationException(Msg.code(47) + prefix + e, theMigrationResult, e);
}
}
private void preExecute(BaseTask theTask) {
myCallbacks.forEach(action -> action.preExecution(theTask));
@ -177,11 +187,6 @@ public class HapiMigrator {
myTaskList.add(theTask);
}
@Nonnull
public List<IHapiMigrationCallback> getCallbacks() {
return myCallbacks;
}
public void setCallbacks(@Nonnull List<IHapiMigrationCallback> theCallbacks) {
Validate.notNull(theCallbacks);
myCallbacks = theCallbacks;

View File

@ -37,6 +37,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -55,6 +56,10 @@ public class HapiMigrationDao {
myMigrationQueryBuilder = new MigrationQueryBuilder(theDriverType, theMigrationTablename);
}
public String getMigrationTablename() {
return myMigrationTablename;
}
public Set<MigrationVersion> fetchSuccessfulMigrationVersions() {
List<HapiMigrationEntity> allEntries = findAll();
return allEntries.stream()
@ -68,24 +73,31 @@ public class HapiMigrationDao {
myJdbcTemplate.execute(myMigrationQueryBuilder.deleteAll());
}
public HapiMigrationEntity save(HapiMigrationEntity theEntity) {
/**
*
* @param theEntity to save. If the pid is null, the next available pid will be set
* @return true if any database records were changed
*/
public boolean save(HapiMigrationEntity theEntity) {
Validate.notNull(theEntity.getDescription(), "Description may not be null");
Validate.notNull(theEntity.getExecutionTime(), "Execution time may not be null");
Validate.notNull(theEntity.getSuccess(), "Success may not be null");
Integer highestKey = getHighestKey();
if (highestKey == null || highestKey < 0) {
highestKey = 0;
if (theEntity.getPid() == null) {
Integer highestKey = getHighestKey();
if (highestKey == null || highestKey < 0) {
highestKey = 0;
}
Integer nextAvailableKey = highestKey + 1;
theEntity.setPid(nextAvailableKey);
}
Integer nextAvailableKey = highestKey + 1;
theEntity.setPid(nextAvailableKey);
theEntity.setType("JDBC");
theEntity.setScript("HAPI FHIR");
theEntity.setInstalledBy(VersionEnum.latestVersion().name());
theEntity.setInstalledOn(new Date());
String insertRecordStatement = myMigrationQueryBuilder.insertPreparedStatement();
int result = myJdbcTemplate.update(insertRecordStatement, theEntity.asPreparedStatementSetter());
return theEntity;
int changedRecordCount = myJdbcTemplate.update(insertRecordStatement, theEntity.asPreparedStatementSetter());
return changedRecordCount > 0;
}
private Integer getHighestKey() {
@ -106,6 +118,9 @@ public class HapiMigrationDao {
String createIndexStatement = myMigrationQueryBuilder.createIndexStatement();
ourLog.info(createIndexStatement);
myJdbcTemplate.execute(createIndexStatement);
HapiMigrationEntity entity = HapiMigrationEntity.tableCreatedRecord();
myJdbcTemplate.update(myMigrationQueryBuilder.insertPreparedStatement(), entity.asPreparedStatementSetter());
}
private boolean migrationTableExists() {
@ -132,4 +147,18 @@ public class HapiMigrationDao {
ourLog.debug("Executing query: [{}]", allQuery);
return myJdbcTemplate.query(allQuery, HapiMigrationEntity.rowMapper());
}
/**
* @return true if the record was successfully deleted
*/
public boolean deleteLockRecord(Integer theLockPid, String theLockDescription) {
int recordsChanged = myJdbcTemplate.update(myMigrationQueryBuilder.deleteLockRecordStatement(theLockPid, theLockDescription));
return recordsChanged > 0;
}
public Optional<HapiMigrationEntity> findFirstByPidAndNotDescription(Integer theLockPid, String theLockDescription) {
String query = myMigrationQueryBuilder.findByPidAndNotDescriptionQuery(theLockPid, theLockDescription);
return myJdbcTemplate.query(query, HapiMigrationEntity.rowMapper()).stream().findFirst();
}
}

View File

@ -24,15 +24,13 @@ import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;
import ca.uhn.fhir.jpa.migrate.entity.HapiMigrationEntity;
import ca.uhn.fhir.jpa.migrate.taskdef.ColumnTypeEnum;
import ca.uhn.fhir.jpa.migrate.taskdef.ColumnTypeToDriverTypeToSqlType;
import com.healthmarketscience.common.util.AppendableExt;
import com.healthmarketscience.sqlbuilder.BinaryCondition;
import com.healthmarketscience.sqlbuilder.CreateIndexQuery;
import com.healthmarketscience.sqlbuilder.CreateTableQuery;
import com.healthmarketscience.sqlbuilder.DeleteQuery;
import com.healthmarketscience.sqlbuilder.FunctionCall;
import com.healthmarketscience.sqlbuilder.InsertQuery;
import com.healthmarketscience.sqlbuilder.SelectQuery;
import com.healthmarketscience.sqlbuilder.SqlObject;
import com.healthmarketscience.sqlbuilder.ValidationContext;
import com.healthmarketscience.sqlbuilder.dbspec.basic.DbColumn;
import com.healthmarketscience.sqlbuilder.dbspec.basic.DbSchema;
import com.healthmarketscience.sqlbuilder.dbspec.basic.DbSpec;
@ -40,7 +38,6 @@ import com.healthmarketscience.sqlbuilder.dbspec.basic.DbTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Types;
public class MigrationQueryBuilder {
@ -137,30 +134,6 @@ public class MigrationQueryBuilder {
.toString();
}
/**
sqlbuilder doesn't know about different database types, so we have to manually map boolean columns ourselves :-(
I'm gaining a new appreciation for Hibernate. I tried using hibernate for maintaining this table, but it added
a lot of overhead for managing just one table. Seeing this sort of nonsense though makes me wonder if we should
just use it instead of sqlbuilder. Disappointed that sqlbuilder doesn't handle boolean well out of the box.
(It does support a static option via System.setProperty("com.healthmarketscience.sqlbuilder.useBooleanLiterals", "true");
but that only works at classloading time which isn't much help to us.
*/
private SqlObject getBooleanValue(Boolean theBoolean) {
SqlObject retval = new SqlObject() {
@Override
protected void collectSchemaObjects(ValidationContext vContext) {}
@Override
public void appendTo(AppendableExt app) throws IOException {
String stringValue = ColumnTypeToDriverTypeToSqlType.toBooleanValue(myDriverType, theBoolean);
app.append(stringValue);
}
};
return retval;
}
public String createTableStatement() {
return new CreateTableQuery(myTable, true)
.validate()
@ -178,6 +151,25 @@ public class MigrationQueryBuilder {
public String findAllQuery() {
return new SelectQuery()
.addFromTable(myTable)
.addCondition(BinaryCondition.notEqualTo(myInstalledRankCol, HapiMigrationEntity.CREATE_TABLE_PID))
.addAllColumns()
.validate()
.toString();
}
public String deleteLockRecordStatement(Integer theLockPid, String theLockDescription) {
return new DeleteQuery(myTable)
.addCondition(BinaryCondition.equalTo(myInstalledRankCol, theLockPid))
.addCondition(BinaryCondition.equalTo(myDescriptionCol, theLockDescription))
.validate()
.toString();
}
public String findByPidAndNotDescriptionQuery(Integer theLockPid, String theLockDescription) {
return new SelectQuery()
.addFromTable(myTable)
.addCondition(BinaryCondition.equalTo(myInstalledRankCol, theLockPid))
.addCondition(BinaryCondition.notEqualTo(myDescriptionCol, theLockDescription))
.addAllColumns()
.validate()
.toString();

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.migrate.entity;
*/
import ca.uhn.fhir.jpa.migrate.taskdef.BaseTask;
import ca.uhn.fhir.util.VersionEnum;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;
@ -41,6 +42,9 @@ public class HapiMigrationEntity {
public static final int TYPE_MAX_SIZE = 20;
public static final int SCRIPT_MAX_SIZE = 1000;
public static final int INSTALLED_BY_MAX_SIZE = 100;
public static final int CREATE_TABLE_PID = -1;
public static final String INITIAL_RECORD_DESCRIPTION = "<< HAPI FHIR Schema History table created >>";
public static final String INITIAL_RECORD_SCRIPT = "HAPI FHIR";
@Id
@SequenceGenerator(name = "SEQ_FLY_HFJ_MIGRATION", sequenceName = "SEQ_FLY_HFJ_MIGRATION")
@GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_FLY_HFJ_MIGRATION")
@ -74,6 +78,19 @@ public class HapiMigrationEntity {
@Column(name = "SUCCESS")
private Boolean mySuccess;
public static HapiMigrationEntity tableCreatedRecord() {
HapiMigrationEntity retVal = new HapiMigrationEntity();
retVal.setPid(CREATE_TABLE_PID);
retVal.setDescription(INITIAL_RECORD_DESCRIPTION);
retVal.setType("TABLE");
retVal.setScript(INITIAL_RECORD_SCRIPT);
retVal.setInstalledBy(VersionEnum.latestVersion().name());
retVal.setInstalledOn(new Date());
retVal.setExecutionTime(0);
retVal.setSuccess(true);
return retVal;
}
public Integer getPid() {
return myPid;
}

View File

@ -5,10 +5,8 @@ import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import javax.sql.DataSource;
public abstract class BaseMigrationTest {
private static final String TABLE_NAME = "TEST_MIGRATION_TABLE";
static final String TABLE_NAME = "TEST_MIGRATION_TABLE";
protected static HapiMigrationDao ourHapiMigrationDao;
protected static HapiMigrationStorageSvc ourHapiMigrationStorageSvc;
@ -19,7 +17,7 @@ public abstract class BaseMigrationTest {
ourHapiMigrationStorageSvc = new HapiMigrationStorageSvc(ourHapiMigrationDao);
}
private static DataSource getDataSource() {
static BasicDataSource getDataSource() {
BasicDataSource retVal = new BasicDataSource();
retVal.setDriver(new org.h2.Driver());
retVal.setUrl("jdbc:h2:mem:test_migration");

View File

@ -8,14 +8,18 @@ import ca.uhn.fhir.jpa.migrate.tasks.api.BaseMigrationTasks;
import ca.uhn.fhir.jpa.migrate.tasks.api.Builder;
import org.flywaydb.core.api.MigrationVersion;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
class HapiMigrationStorageSvcTest extends BaseMigrationTest {
private static final String RELEASE = "V5_5_0";
@ -60,6 +64,35 @@ class HapiMigrationStorageSvcTest extends BaseMigrationTest {
assertEquals(RELEASE_VERSION_PREFIX + LAST_SUCCEEDED_VERSION, newLatest);
}
@Test
void insert_delete() {
String description = UUID.randomUUID().toString();
int initialCount = countRecords();
assertTrue(ourHapiMigrationStorageSvc.insertLockRecord(description));
assertEquals(initialCount + 1, countRecords());
ourHapiMigrationStorageSvc.deleteLockRecord(description);
assertEquals(initialCount, countRecords());
}
@Test
void verifyNoOtherLocksPresent() {
String otherLock = UUID.randomUUID().toString();
String thisLock = UUID.randomUUID().toString();
ourHapiMigrationStorageSvc.verifyNoOtherLocksPresent(thisLock);
assertTrue(ourHapiMigrationStorageSvc.insertLockRecord(otherLock));
try {
ourHapiMigrationStorageSvc.verifyNoOtherLocksPresent(thisLock);
fail();
} catch (HapiMigrationException e) {
assertEquals("HAPI-2152: Internal error: on unlocking, a competing lock was found", e.getMessage());
}
}
private int countRecords() {
JdbcTemplate jdbcTemplate = new JdbcTemplate(getDataSource());
return jdbcTemplate.queryForObject("SELECT COUNT(*) FROM " + BaseMigrationTest.TABLE_NAME, Integer.class);
}
void createTasks() {
MigrationTaskList taskList = buildTasks();
assertEquals(7, taskList.size());

View File

@ -0,0 +1,184 @@
package ca.uhn.fhir.jpa.migrate;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.migrate.taskdef.BaseTask;
import ca.uhn.test.concurrency.IPointcutLatch;
import ca.uhn.test.concurrency.PointcutLatch;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class HapiMigratorIT {
private static final Logger ourLog = LoggerFactory.getLogger(HapiMigratorIT.class);
private static final String MIGRATION_TABLENAME = "TEST_MIGRATOR_TABLE";
private final BasicDataSource myDataSource = BaseMigrationTest.getDataSource();
private final JdbcTemplate myJdbcTemplate = new JdbcTemplate(myDataSource);
@BeforeEach
void before() {
HapiMigrator migrator = buildMigrator();
migrator.createMigrationTableIfRequired();
Integer count = myJdbcTemplate.queryForObject("SELECT COUNT(*) FROM " + MIGRATION_TABLENAME, Integer.class);
assertTrue(count > 0);
}
@AfterEach
void after() {
myJdbcTemplate.execute("DROP TABLE " + MIGRATION_TABLENAME);
assertEquals(0, myDataSource.getNumActive());
}
@Test
void test_onecall_noblock() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
LatchMigrationTask latchMigrationTask = new LatchMigrationTask("only", "1");
HapiMigrator migrator = buildMigrator(latchMigrationTask);
latchMigrationTask.setExpectedCount(1);
Future<MigrationResult> future = executor.submit(() -> migrator.migrate());
latchMigrationTask.awaitExpected();
latchMigrationTask.release("1");
MigrationResult result = future.get();
assertThat(result.succeededTasks, hasSize(1));
}
@Test
void test_twocalls_block() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Create two migrators to simulate two servers running at the same time
LatchMigrationTask latchMigrationTask1 = new LatchMigrationTask("first", "1");
HapiMigrator migrator1 = buildMigrator(latchMigrationTask1);
LatchMigrationTask latchMigrationTask2 = new LatchMigrationTask("second new", "2");
LatchMigrationTask latchMigrationTask3 = new LatchMigrationTask("third repeat", "1");
HapiMigrator migrator2 = buildMigrator(latchMigrationTask2);
migrator2.addTask(latchMigrationTask3);
// We only expect the first migration to run because the second one will block on the lock and by the time the lock
// is released, the first one will have already run so there will be nothing to do
latchMigrationTask1.setExpectedCount(1);
Future<MigrationResult> future1 = executor.submit(() -> migrator1.migrate());
latchMigrationTask1.awaitExpected();
// We wait until the first migration is in the middle of executing the migration task before we start the second one
// Release the first migration task so it can complete and unblock to allow the second one to start
latchMigrationTask1.release("1");
latchMigrationTask2.setExpectedCount(1);
Future<MigrationResult> future2 = executor.submit(() -> migrator2.migrate());
latchMigrationTask2.awaitExpected();
// This second call shouldn't be necessary, but it will help the test fail faster with a clearer error
latchMigrationTask2.release("2");
latchMigrationTask3.release("3");
MigrationResult result1 = future1.get();
MigrationResult result2 = future2.get();
// Tasks were only run on the first migration
assertThat(result1.succeededTasks, hasSize(1));
assertThat(result2.succeededTasks, hasSize(1));
}
@Nonnull
private HapiMigrator buildMigrator(LatchMigrationTask theLatchMigrationTask) {
HapiMigrator retval = buildMigrator();
retval.addTask(theLatchMigrationTask);
return retval;
}
@Nonnull
private HapiMigrator buildMigrator() {
return new HapiMigrator(MIGRATION_TABLENAME, myDataSource, DriverTypeEnum.H2_EMBEDDED);
}
private class LatchMigrationTask extends BaseTask implements IPointcutLatch {
private final PointcutLatch myLatch;
private final PointcutLatch myWaitLatch;
protected LatchMigrationTask(String name, String theSchemaVersion) {
super(theSchemaVersion, theSchemaVersion);
myLatch = new PointcutLatch("MigrationTask " + name + " called");
myWaitLatch = new PointcutLatch("MigrationTask " + name + " wait");
myWaitLatch.setExpectedCount(1);
}
@Override
public void validate() {
}
@Override
protected void doExecute() {
try {
myLatch.call(this);
myWaitLatch.awaitExpected();
ourLog.info("Latch released with parameter {}", myWaitLatch.getLatchInvocationParameter());
// We sleep a bit to ensure the other thread has a chance to try to get the lock. We don't have a hook there, so sleep instead
// Maybe we can await on a log message?
Thread.sleep(200);
ourLog.info("Completing execution of {}", myWaitLatch.getLatchInvocationParameter());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
protected void generateHashCode(HashCodeBuilder theBuilder) {
}
@Override
protected void generateEquals(EqualsBuilder theBuilder, BaseTask theOtherObject) {
}
@Override
public void clear() {
myLatch.clear();
}
@Override
public void setExpectedCount(int theCount) {
myLatch.setExpectedCount(theCount);
}
@Override
public List<HookParams> awaitExpected() throws InterruptedException {
return myLatch.awaitExpected();
}
public void release(String theLatchInvocationParameter) {
myWaitLatch.call(theLatchInvocationParameter);
}
}
}

View File

@ -9,9 +9,9 @@ import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class HapiMigrationDaoTest extends BaseMigrationTest {
class HapiMigrationDaoIT extends BaseMigrationTest {
@Test
public void findAll_empty_returnsNothing() {
@ -23,16 +23,16 @@ class HapiMigrationDaoTest extends BaseMigrationTest {
public void findAll_2records_returnsBoth() {
HapiMigrationEntity record1 = buildEntity("DESC1", "1.1");
HapiMigrationEntity result1 = ourHapiMigrationDao.save(record1);
assertEquals(1, result1.getPid());
boolean result1 = ourHapiMigrationDao.save(record1);
assertTrue(result1);
{
Set<MigrationVersion> all = ourHapiMigrationDao.fetchSuccessfulMigrationVersions();
assertThat(all, hasSize(1));
}
HapiMigrationEntity record2 = buildEntity("DESC2", "1.2");
HapiMigrationEntity result2 = ourHapiMigrationDao.save(record2);
assertEquals(2, result2.getPid());
boolean result2 = ourHapiMigrationDao.save(record2);
assertTrue(result2);
{
Set<MigrationVersion> all = ourHapiMigrationDao.fetchSuccessfulMigrationVersions();
assertThat(all, hasSize(2));