Run online index operations non-transactionally on Postgres (#3413)

* Run online index operations non-transactionally on Postgres

* narrow non-transactional
This commit is contained in:
michaelabuckley 2022-02-22 21:49:53 -05:00 committed by GitHub
parent fcada39044
commit 96cadbba3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 22 deletions

View File

@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -135,6 +134,8 @@ public class AddIndexTask extends BaseTableTask {
switch (getDriverType()) { switch (getDriverType()) {
case POSTGRES_9_4: case POSTGRES_9_4:
postgresOnline = "CONCURRENTLY "; postgresOnline = "CONCURRENTLY ";
// This runs without a lock, and can't be done transactionally.
setTransactional(false);
break; break;
case ORACLE_12C: case ORACLE_12C:
oracleOnlineDeferred = " ONLINE DEFERRED INVALIDATION"; oracleOnlineDeferred = " ONLINE DEFERRED INVALIDATION";

View File

@ -54,6 +54,16 @@ public abstract class BaseTask {
private String myDescription; private String myDescription;
private int myChangesCount; private int myChangesCount;
private boolean myDryRun; private boolean myDryRun;
/**
* Some migrations can not be run in a transaction.
* When this is true, {@link BaseTask#executeSql} will run without a transaction
*/
public void setTransactional(boolean theTransactional) {
myTransactional = theTransactional;
}
private boolean myTransactional = true;
private boolean myDoNothing; private boolean myDoNothing;
private List<ExecutedStatement> myExecutedStatements = new ArrayList<>(); private List<ExecutedStatement> myExecutedStatements = new ArrayList<>();
private Set<DriverTypeEnum> myOnlyAppliesToPlatforms = new HashSet<>(); private Set<DriverTypeEnum> myOnlyAppliesToPlatforms = new HashSet<>();
@ -134,27 +144,15 @@ public abstract class BaseTask {
* @param theArguments The SQL statement arguments * @param theArguments The SQL statement arguments
*/ */
public void executeSql(String theTableName, @Language("SQL") String theSql, Object... theArguments) { public void executeSql(String theTableName, @Language("SQL") String theSql, Object... theArguments) {
if (isDryRun() == false) { if (!isDryRun()) {
Integer changes = getConnectionProperties().getTxTemplate().execute(t -> { Integer changes;
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate(); if (myTransactional) {
try { changes = getConnectionProperties().getTxTemplate().execute(t -> {
int changesCount = jdbcTemplate.update(theSql, theArguments); return doExecuteSql(theSql, theArguments);
if (!"true".equals(System.getProperty("unit_test_mode"))) { });
logInfo(ourLog, "SQL \"{}\" returned {}", theSql, changesCount); } else {
} changes = doExecuteSql(theSql, theArguments);
return changesCount; }
} catch (DataAccessException e) {
if (myFailureAllowed) {
ourLog.info("Task {} did not exit successfully, but task is allowed to fail", getFlywayVersion());
ourLog.debug("Error was: {}", e.getMessage(), e);
return 0;
} else {
throw new DataAccessException(Msg.code(61) + "Failed during task " + getFlywayVersion() + ": " + e, e) {
private static final long serialVersionUID = 8211678931579252166L;
};
}
}
});
myChangesCount += changes; myChangesCount += changes;
} }
@ -162,6 +160,27 @@ public abstract class BaseTask {
captureExecutedStatement(theTableName, theSql, theArguments); captureExecutedStatement(theTableName, theSql, theArguments);
} }
private int doExecuteSql(@Language("SQL") String theSql, Object[] theArguments) {
JdbcTemplate jdbcTemplate = getConnectionProperties().newJdbcTemplate();
try {
int changesCount = jdbcTemplate.update(theSql, theArguments);
if (!"true".equals(System.getProperty("unit_test_mode"))) {
logInfo(ourLog, "SQL \"{}\" returned {}", theSql, changesCount);
}
return changesCount;
} catch (DataAccessException e) {
if (myFailureAllowed) {
ourLog.info("Task {} did not exit successfully, but task is allowed to fail", getFlywayVersion());
ourLog.debug("Error was: {}", e.getMessage(), e);
return 0;
} else {
throw new DataAccessException(Msg.code(61) + "Failed during task " + getFlywayVersion() + ": " + e, e) {
private static final long serialVersionUID = 8211678931579252166L;
};
}
}
}
protected void captureExecutedStatement(String theTableName, @Language("SQL") String theSql, Object[] theArguments) { protected void captureExecutedStatement(String theTableName, @Language("SQL") String theSql, Object[] theArguments) {
myExecutedStatements.add(new ExecutedStatement(theTableName, theSql, theArguments)); myExecutedStatements.add(new ExecutedStatement(theTableName, theSql, theArguments));
} }

View File

@ -92,6 +92,7 @@ public class DropIndexTask extends BaseTableTask {
case POSTGRES_9_4: case POSTGRES_9_4:
sql.add("alter table " + getTableName() + " drop constraint if exists " + myIndexName + " cascade"); sql.add("alter table " + getTableName() + " drop constraint if exists " + myIndexName + " cascade");
sql.add("drop index " + (myOnline?"CONCURRENTLY ":"") + "if exists " + myIndexName + " cascade"); sql.add("drop index " + (myOnline?"CONCURRENTLY ":"") + "if exists " + myIndexName + " cascade");
setTransactional(!myOnline);
break; break;
} }
} else { } else {
@ -103,6 +104,7 @@ public class DropIndexTask extends BaseTableTask {
break; break;
case POSTGRES_9_4: case POSTGRES_9_4:
sql.add("drop index " + (myOnline?"CONCURRENTLY ":"") + myIndexName); sql.add("drop index " + (myOnline?"CONCURRENTLY ":"") + myIndexName);
setTransactional(!myOnline);
break; break;
case DERBY_EMBEDDED: case DERBY_EMBEDDED:
case H2_EMBEDDED: case H2_EMBEDDED: