[AMQ-7473] Add stopOnError configuration to stop the broker when locker has an exception

This commit is contained in:
jbonofre 2020-05-04 11:08:50 +02:00
parent ae7b5322eb
commit e9caa75b1a
9 changed files with 126 additions and 18 deletions

View File

@ -57,4 +57,5 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker {
public void setLockable(LockableServiceSupport lockableServiceSupport) { public void setLockable(LockableServiceSupport lockableServiceSupport) {
this.lockable = lockableServiceSupport; this.lockable = lockableServiceSupport;
} }
} }

View File

@ -31,6 +31,13 @@ public interface Lockable {
*/ */
public void setUseLock(boolean useLock); public void setUseLock(boolean useLock);
/**
* Stop the broker if the locker get an exception while processing lock.
*
* @param stopOnError
*/
public void setStopOnError(boolean stopOnError);
/** /**
* Create a default locker * Create a default locker
* *

View File

@ -36,6 +36,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class); private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
boolean useLock = true; boolean useLock = true;
boolean stopOnError = false;
Locker locker; Locker locker;
long lockKeepAlivePeriod = 0; long lockKeepAlivePeriod = 0;
private ScheduledFuture<?> keepAliveTicket; private ScheduledFuture<?> keepAliveTicket;
@ -58,6 +59,15 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
return this.useLock; return this.useLock;
} }
@Override
public void setStopOnError(boolean stopOnError) {
this.stopOnError = stopOnError;
}
public boolean isStopOnError() {
return this.stopOnError;
}
@Override @Override
public void setLocker(Locker locker) throws IOException { public void setLocker(Locker locker) throws IOException {
this.locker = locker; this.locker = locker;
@ -129,8 +139,14 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
} }
} }
} catch (SuppressReplyException e) { } catch (SuppressReplyException e) {
if (stopOnError) {
stop = true;
}
LOG.warn("locker keepAlive resulted in", e); LOG.warn("locker keepAlive resulted in", e);
} catch (IOException e) { } catch (IOException e) {
if (stopOnError) {
stop = true;
}
LOG.warn("locker keepAlive resulted in", e); LOG.warn("locker keepAlive resulted in", e);
} }
if (stop) { if (stop) {

View File

@ -94,6 +94,8 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
private int transactionIsolation; private int transactionIsolation;
private File directory; private File directory;
private boolean changeAutoCommitAllowed = true; private boolean changeAutoCommitAllowed = true;
private int queryTimeout = -1;
private int networkTimeout = -1;
protected int maxProducersToAudit=1024; protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000; protected int maxAuditDepth=1000;
@ -513,7 +515,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
} }
public TransactionContext getTransactionContext() throws IOException { public TransactionContext getTransactionContext() throws IOException {
TransactionContext answer = new TransactionContext(this); TransactionContext answer = new TransactionContext(this, networkTimeout, queryTimeout);
if (transactionIsolation > 0) { if (transactionIsolation > 0) {
answer.setTransactionIsolation(transactionIsolation); answer.setTransactionIsolation(transactionIsolation);
} }
@ -564,6 +566,32 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.changeAutoCommitAllowed = changeAutoCommitAllowed; this.changeAutoCommitAllowed = changeAutoCommitAllowed;
} }
public int getNetworkTimeout() {
return networkTimeout;
}
/**
* Define the JDBC connection network timeout.
*
* @param networkTimeout the connection network timeout (in milliseconds).
*/
public void setNetworkTimeout(int networkTimeout) {
this.networkTimeout = networkTimeout;
}
public int getQueryTimeout() {
return queryTimeout;
}
/**
* Define the JDBC statement query timeout.
*
* @param queryTimeout the statement query timeout (in seconds).
*/
public void setQueryTimeout(int queryTimeout) {
this.queryTimeout = queryTimeout;
}
@Override @Override
public void deleteAllMessages() throws IOException { public void deleteAllMessages() throws IOException {
TransactionContext c = getTransactionContext(); TransactionContext c = getTransactionContext();

View File

@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -49,10 +50,14 @@ public class TransactionContext {
private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
private LinkedList<Runnable> completions = new LinkedList<Runnable>(); private LinkedList<Runnable> completions = new LinkedList<Runnable>();
private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock(); private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock();
private int networkTimeout;
private int queryTimeout;
public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { public TransactionContext(JDBCPersistenceAdapter persistenceAdapter, int networkTimeout, int queryTimeout) throws IOException {
this.persistenceAdapter = persistenceAdapter; this.persistenceAdapter = persistenceAdapter;
this.dataSource = persistenceAdapter.getDataSource(); this.dataSource = persistenceAdapter.getDataSource();
this.networkTimeout = networkTimeout;
this.queryTimeout = queryTimeout;
} }
public Connection getExclusiveConnection() throws IOException { public Connection getExclusiveConnection() throws IOException {
@ -68,6 +73,9 @@ public class TransactionContext {
toLock.lock(); toLock.lock();
try { try {
connection = dataSource.getConnection(); connection = dataSource.getConnection();
if (networkTimeout > 0) {
connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), networkTimeout);
}
if (persistenceAdapter.isChangeAutoCommitAllowed()) { if (persistenceAdapter.isChangeAutoCommitAllowed()) {
boolean autoCommit = !inTx; boolean autoCommit = !inTx;
if (connection.getAutoCommit() != autoCommit) { if (connection.getAutoCommit() != autoCommit) {
@ -300,17 +308,29 @@ public class TransactionContext {
// simple delegate for the rest of the impl.. // simple delegate for the rest of the impl..
@Override @Override
public Statement createStatement() throws SQLException { public Statement createStatement() throws SQLException {
return delegate.createStatement(); Statement statement = delegate.createStatement();
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql) throws SQLException { public PreparedStatement prepareStatement(String sql) throws SQLException {
return delegate.prepareStatement(sql); PreparedStatement statement = delegate.prepareStatement(sql);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public CallableStatement prepareCall(String sql) throws SQLException { public CallableStatement prepareCall(String sql) throws SQLException {
return delegate.prepareCall(sql); CallableStatement statement = delegate.prepareCall(sql);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
@ -390,17 +410,29 @@ public class TransactionContext {
@Override @Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.createStatement(resultSetType, resultSetConcurrency); Statement statement = delegate.createStatement(resultSetType, resultSetConcurrency);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency); PreparedStatement statement = delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency); CallableStatement statement = delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
@ -445,32 +477,56 @@ public class TransactionContext {
@Override @Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); Statement statement = delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); PreparedStatement statement = delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); CallableStatement statement = delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return delegate.prepareStatement(sql, autoGeneratedKeys); PreparedStatement statement = delegate.prepareStatement(sql, autoGeneratedKeys);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return delegate.prepareStatement(sql, columnIndexes); PreparedStatement statement = delegate.prepareStatement(sql, columnIndexes);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return delegate.prepareStatement(sql, columnNames); PreparedStatement statement = delegate.prepareStatement(sql, columnNames);
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
return statement;
} }
@Override @Override

View File

@ -243,7 +243,7 @@ public class AMQ4636Test {
public TestTransactionContext( public TestTransactionContext(
JDBCPersistenceAdapter jdbcPersistenceAdapter) JDBCPersistenceAdapter jdbcPersistenceAdapter)
throws IOException { throws IOException {
super(jdbcPersistenceAdapter); super(jdbcPersistenceAdapter, -1, -1);
} }
@Override @Override

View File

@ -277,7 +277,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
public TestTransactionContext( public TestTransactionContext(
JDBCPersistenceAdapter jdbcPersistenceAdapter) JDBCPersistenceAdapter jdbcPersistenceAdapter)
throws IOException { throws IOException {
super(jdbcPersistenceAdapter); super(jdbcPersistenceAdapter, -1, -1);
} }
public void executeBatch() throws SQLException { public void executeBatch() throws SQLException {

View File

@ -335,7 +335,7 @@ public class JmsTransactionCommitFailureTest {
@Override @Override
public TransactionContext getTransactionContext() throws IOException { public TransactionContext getTransactionContext() throws IOException {
TransactionContext answer = new TransactionContext(this) { TransactionContext answer = new TransactionContext(this, -1, -1) {
@Override @Override
public void executeBatch() throws SQLException { public void executeBatch() throws SQLException {
if (isCommitFailureEnabled) { if (isCommitFailureEnabled) {

View File

@ -41,7 +41,7 @@ public class TransactionContextTest {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
underTest = new TransactionContext(jdbcPersistenceAdapter); underTest = new TransactionContext(jdbcPersistenceAdapter, 0, 0);
} }
@Test @Test