https://issues.apache.org/jira/browse/AMQ-3654 - JDBC Master/Slave : Slave cannot acquire lock when the master loose database connection. Adding an leasebasedlocker that can survive db disconnect, and jdbcioexceptionhandler that will pause/resume transports on db outage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1350006 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-13 20:45:10 +00:00
parent 9bf65b4f09
commit dc258ab2ae
14 changed files with 734 additions and 22 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
@ -142,7 +143,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDispatchList;
boolean inProgressClearRequiredFlag;
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
@ -685,15 +686,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
void inProgressClearRequired() {
inProgressClearRequiredFlag = true;
inProgressClearRequiredFlag.incrementAndGet();
// deal with delivered messages async to avoid lock contention with in progress acks
clearDispatchList = true;
}
void clearMessagesInProgress() {
if (inProgressClearRequiredFlag) {
if (inProgressClearRequiredFlag.get() > 0) {
synchronized (unconsumedMessages.getMutex()) {
if (inProgressClearRequiredFlag) {
if (inProgressClearRequiredFlag.get() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
}
@ -706,7 +707,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
inProgressClearRequiredFlag = false;
inProgressClearRequiredFlag.decrementAndGet();
}
}
}

View File

@ -37,8 +37,9 @@ public interface DatabaseLocker extends Service {
/**
* Used by a timer to keep alive the lock.
* If the method returns false the broker should be terminated
* if an exception is thrown, the lock state cannot be determined
*/
boolean keepAlive();
boolean keepAlive() throws IOException;
/**
* set the delay interval in milliseconds between lock acquire attempts

View File

@ -170,7 +170,7 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
}
}
public boolean keepAlive() {
public boolean keepAlive() throws IOException {
boolean result = false;
try {
lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import org.apache.activemq.util.DefaultIOExceptionHandler;
/**
* @org.apache.xbean.XBean
*/
public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
public JDBCIOExceptionHandler() {
setIgnoreSQLExceptions(false);
setStopStartConnectors(true);
}
@Override
protected boolean hasLockOwnership() throws IOException {
boolean hasLock = true;
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) broker.getPersistenceAdapter();
DatabaseLocker locker = jdbcPersistenceAdapter.getDatabaseLocker();
if (locker != null) {
try {
if (!locker.keepAlive()) {
hasLock = false;
}
} catch (IOException ignored) {
}
if (!hasLock) {
throw new IOException("PersistenceAdapter lock no longer valid using: " + locker);
}
}
}
return hasLock;
}
}

View File

@ -623,7 +623,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
}
} catch (IOException e) {
LOG.error("Failed to get database when trying keepalive: " + e, e);
LOG.warn("databaselocker keepalive resulted in: " + e, e);
}
if (stop) {
stopBroker();
@ -632,7 +632,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected void stopBroker() {
// we can no longer keep the lock so lets fail
LOG.info("No longer able to keep the exclusive lock so giving up being a master");
LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.activemq.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents an exclusive lease on a database to avoid multiple brokers running
* against the same logical database.
*
* @org.apache.xbean.XBean element="lease-database-locker"
*
*/
public class LeaseDatabaseLocker implements DatabaseLocker {
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
protected DataSource dataSource;
protected Statements statements;
protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected boolean stopping;
protected int maxAllowableDiffFromDBTime = 2000;
protected long diffFromCurrentTime = Long.MAX_VALUE;
protected String leaseHolderId;
protected int queryTimeout = -1;
JDBCPersistenceAdapter persistenceAdapter;
public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
this.dataSource = adapter.getLockDataSource();
this.statements = adapter.getStatements();
this.persistenceAdapter = adapter;
}
public void start() throws Exception {
stopping = false;
LOG.info(getLeaseHolderId() + " attempting to acquire the exclusive lease to become the Master broker");
String sql = statements.getLeaseObtainStatement();
LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
while (!stopping) {
Connection connection = null;
PreparedStatement statement = null;
try {
connection = getConnection();
initTimeDiff(connection);
statement = connection.prepareStatement(sql);
setQueryTimeout(statement);
final long now = System.currentTimeMillis() + diffFromCurrentTime;
statement.setString(1, getLeaseHolderId());
statement.setLong(2, now + lockAcquireSleepInterval);
statement.setLong(3, now);
int result = statement.executeUpdate();
if (result == 1) {
// we got the lease, verify we still have it
if (keepAlive()) {
break;
}
}
reportLeasOwnerShipAndDuration(connection);
} catch (Exception e) {
LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
} finally {
close(statement);
close(connection);
}
LOG.info(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
}
if (stopping) {
throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
}
LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
}
private void setQueryTimeout(PreparedStatement statement) throws SQLException {
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
}
private Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
private void close(Connection connection) {
if (null != connection) {
try {
connection.close();
} catch (SQLException e1) {
LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
}
}
}
private void close(PreparedStatement statement) {
if (null != statement) {
try {
statement.close();
} catch (SQLException e1) {
LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
}
}
}
private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
}
} finally {
close(statement);
}
}
protected long initTimeDiff(Connection connection) throws SQLException {
if (maxAllowableDiffFromDBTime > 0 && Long.MAX_VALUE == diffFromCurrentTime) {
diffFromCurrentTime = determineTimeDifference(connection);
}
return diffFromCurrentTime;
}
private long determineTimeDifference(Connection connection) throws SQLException {
PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
ResultSet resultSet = statement.executeQuery();
long result = 0l;
if (resultSet.next()) {
Timestamp timestamp = resultSet.getTimestamp(1);
long diff = System.currentTimeMillis() - timestamp.getTime();
LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
// off by more than maxAllowableDiffFromDBTime so lets adjust
result = diff;
}
}
return result;
}
public void stop() throws Exception {
releaseLease();
stopping = true;
}
private void releaseLease() {
Connection connection = null;
PreparedStatement statement = null;
try {
connection = getConnection();
statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
statement.setString(1, null);
statement.setLong(2, 0l);
statement.setString(3, getLeaseHolderId());
if (statement.executeUpdate() == 1) {
LOG.info(getLeaseHolderId() + ", released lease");
}
} catch (Exception e) {
LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
} finally {
close(statement);
close(connection);
}
}
@Override
public boolean keepAlive() throws IOException {
boolean result = false;
final String sql = statements.getLeaseUpdateStatement();
LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
Connection connection = null;
PreparedStatement statement = null;
try {
connection = getConnection();
initTimeDiff(connection);
statement = connection.prepareStatement(sql);
setQueryTimeout(statement);
final long now = System.currentTimeMillis() + diffFromCurrentTime;
statement.setString(1, getLeaseHolderId());
statement.setLong(2, now + lockAcquireSleepInterval);
statement.setString(3, getLeaseHolderId());
result = (statement.executeUpdate() == 1);
} catch (Exception e) {
LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
throw ioe;
} finally {
close(statement);
close(connection);
}
return result;
}
public long getLockAcquireSleepInterval() {
return lockAcquireSleepInterval;
}
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
this.lockAcquireSleepInterval = lockAcquireSleepInterval;
}
public int getQueryTimeout() {
return queryTimeout;
}
public void setQueryTimeout(int queryTimeout) {
this.queryTimeout = queryTimeout;
}
public String getLeaseHolderId() {
if (leaseHolderId == null) {
if (persistenceAdapter.getBrokerService() != null) {
leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
}
}
return leaseHolderId;
}
public void setLeaseHolderId(String leaseHolderId) {
this.leaseHolderId = leaseHolderId;
}
public int getMaxAllowableDiffFromDBTime() {
return maxAllowableDiffFromDBTime;
}
public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
}
}

View File

@ -85,6 +85,10 @@ public class Statements {
private String updateDurableLastAckWithPriorityStatement;
private String updateDurableLastAckWithPriorityInTxStatement;
private String findXidByIdStatement;
private String leaseObtainStatement;
private String currentDateTimeStatement;
private String leaseUpdateStatement;
private String leaseOwnerStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@ -103,9 +107,9 @@ public class Statements {
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
"CREATE TABLE " + getFullLockTableName()
+ "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
@ -421,6 +425,39 @@ public class Statements {
return lockCreateStatement;
}
public String getLeaseObtainStatement() {
if (leaseObtainStatement == null) {
leaseObtainStatement = "UPDATE " + getFullLockTableName()
+ " SET BROKER_NAME=?, TIME=?"
+ " WHERE (TIME IS NULL OR TIME < ?) AND ID = 1";
}
return leaseObtainStatement;
}
public String getCurrentDateTime() {
if (currentDateTimeStatement == null) {
currentDateTimeStatement = "SELECT CURRENT_TIMESTAMP FROM " + getFullLockTableName();
}
return currentDateTimeStatement;
}
public String getLeaseUpdateStatement() {
if (leaseUpdateStatement == null) {
leaseUpdateStatement = "UPDATE " + getFullLockTableName()
+ " SET BROKER_NAME=?, TIME=?"
+ " WHERE BROKER_NAME=? AND ID = 1";
}
return leaseUpdateStatement;
}
public String getLeaseOwnerStatement() {
if (leaseOwnerStatement == null) {
leaseOwnerStatement = "SELECT BROKER_NAME, TIME FROM " + getFullLockTableName()
+ " WHERE ID = 1";
}
return leaseOwnerStatement;
}
public String getLockUpdateStatement() {
if (lockUpdateStatement == null) {
lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET TIME = ? WHERE ID = 1";
@ -911,4 +948,20 @@ public class Statements {
public void setFindXidByIdStatement(String findXidByIdStatement) {
this.findXidByIdStatement = findXidByIdStatement;
}
public void setLeaseObtainStatement(String leaseObtainStatement) {
this.leaseObtainStatement = leaseObtainStatement;
}
public void setCurrentDateTimeStatement(String currentDateTimeStatement) {
this.currentDateTimeStatement = currentDateTimeStatement;
}
public void setLeaseUpdateStatement(String leaseUpdateStatement) {
this.leaseUpdateStatement = leaseUpdateStatement;
}
public void setLeaseOwnerStatement(String leaseOwnerStatement) {
this.leaseOwnerStatement = leaseOwnerStatement;
}
}

View File

@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class);
private BrokerService broker;
protected BrokerService broker;
private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true;
private boolean ignoreSQLExceptions = true;
@ -94,13 +94,14 @@ import org.slf4j.LoggerFactory;
new Thread("restart transport connectors post IO exception") {
public void run() {
try {
while (isPersistenceAdapterDown()) {
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
} catch (Exception e) {
LOG.warn("Failure occurred while restarting broker connectors", e);
LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
stopBroker(e);
} finally {
stopStartInProgress.compareAndSet(true, false);
}
@ -119,7 +120,11 @@ import org.slf4j.LoggerFactory;
return;
}
LOG.info("Stopping the broker due to IO exception, " + exception, exception);
stopBroker(exception);
}
private void stopBroker(Exception exception) {
LOG.info("Stopping the broker due to exception, " + exception, exception);
new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
@ -131,6 +136,10 @@ import org.slf4j.LoggerFactory;
}.start();
}
protected boolean hasLockOwnership() throws IOException {
return true;
}
public void setBrokerService(BrokerService broker) {
this.broker = broker;
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class);
private long restartDelay = 500;
@Override
protected void configureBroker(BrokerService brokerService) {
brokerService.setIoExceptionHandler(new JDBCIOExceptionHandler());
}
@Override
protected void delayTillRestartRequired() {
if (restartDelay > 500) {
LOG.info("delay for more than lease quantum. While Db is offline, master should stay alive but could loose lease");
} else {
LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
}
try {
TimeUnit.MILLISECONDS.sleep(restartDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0 || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
@Override
public void setUp() throws Exception {
restartDelay = 500;
super.setUp();
}
public void testSendReceiveWithLeaseExpiry() throws Exception {
restartDelay = 3000;
testSendReceive();
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueMasterSlaveLeaseIntactTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactTest.class);
@Override
protected void delayTillRestartRequired() {
LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMasterSlaveTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseTest.class);
@Override
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
super.configureJdbcPersistenceAdapter(persistenceAdapter);
persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
persistenceAdapter.setDatabaseLocker(new LeaseDatabaseLocker());
}
private long getLockKeepAlivePeriod() {
return 500;
}
private long getLockAcquireSleepInterval() {
return 2000;
}
@Override
protected void delayTillRestartRequired() {
LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs");
try {
TimeUnit.MILLISECONDS.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
// the lock is up for grabs after the expiry
}
}

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.broker.ft;
import java.sql.SQLException;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
@ -28,7 +30,8 @@ import org.apache.derby.jdbc.EmbeddedDataSource;
public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
protected void messageSent() throws Exception {
protected void messageSent() throws Exception {
verifyExpectedBroker(inflightMessageCount);
if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = getExistingDataSource();
@ -37,16 +40,32 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
Thread dbRestartThread = new Thread("db-re-start-thread") {
public void run() {
LOG.info("Waiting for master broker to Stop");
master.waitUntilStopped();
delayTillRestartRequired();
ds.setShutdownDatabase("false");
try {
ds.getConnection().close();
} catch (SQLException ignored) {}
LOG.info("DB RESTARTED!@!!!!");
}
};
dbRestartThread.start();
}
verifyExpectedBroker(inflightMessageCount);
}
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
} else if (inflightMessageCount == failureCount + 10) {
assertEquals("connected to slave", slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
protected void delayTillRestartRequired() {
LOG.info("Waiting for master broker to Stop");
master.waitUntilStopped();
}
protected void sendToProducer(MessageProducer producer,
Destination producerDestination, Message message) throws JMSException {
{

View File

@ -16,9 +16,18 @@
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@ -34,18 +43,22 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
persistenceAdapter.setDataSource(getExistingDataSource());
persistenceAdapter.setLockKeepAlivePeriod(500);
persistenceAdapter.setLockAcquireSleepInterval(500);
configureJdbcPersistenceAdapter(persistenceAdapter);
master.setPersistenceAdapter(persistenceAdapter);
configureBroker(master);
master.start();
}
protected void configureBroker(BrokerService master) {
}
protected void createSlave() throws Exception {
// use a separate thread as the slave will block waiting for
// the exclusive db lock
@ -53,7 +66,10 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
public void run() {
try {
BrokerService broker = new BrokerService();
broker.addConnector(SLAVE_URL);
broker.setBrokerName("slave");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(SLAVE_URL));
broker.addConnector(connector);
// no need for broker.setMasterConnectorURI(masterConnectorURI)
// as the db lock provides the slave/master initialisation
broker.setUseJmx(false);
@ -62,9 +78,12 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
persistenceAdapter.setDataSource(getExistingDataSource());
persistenceAdapter.setCreateTablesOnStartup(false);
broker.setPersistenceAdapter(persistenceAdapter);
configureJdbcPersistenceAdapter(persistenceAdapter);
configureBroker(broker);
broker.start();
slave.set(broker);
slaveStarted.countDown();
} catch (IllegalStateException expectedOnShutdown) {
} catch (Exception e) {
fail("failed to start slave broker, reason:" + e);
}
@ -73,6 +92,11 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
t.start();
}
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
persistenceAdapter.setLockKeepAlivePeriod(500);
persistenceAdapter.setLockAcquireSleepInterval(500);
}
protected EmbeddedDataSource getExistingDataSource() throws Exception {
return sharedDs;
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
public class LeaseDatabaseLockerTest {
JDBCPersistenceAdapter jdbc;
BrokerService brokerService;
EmbeddedDataSource dataSource;
@Before
public void setUpStore() throws Exception {
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);
brokerService = new BrokerService();
jdbc.setBrokerService(brokerService);
jdbc.getAdapter().doCreateTables(jdbc.getTransactionContext());
}
@Test
public void testLockInterleave() throws Exception {
LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
brokerService.setBrokerName("First");
lockerA.setPersistenceAdapter(jdbc);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
brokerService.setBrokerName("Second");
lockerB.setPersistenceAdapter(jdbc);
final AtomicBoolean blocked = new AtomicBoolean(true);
final Connection connection = dataSource.getConnection();
printLockTable(connection);
lockerA.start();
printLockTable(connection);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerB.start();
blocked.set(false);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("B is blocked", blocked.get());
assertTrue("A is good", lockerA.keepAlive());
printLockTable(connection);
lockerA.stop();
printLockTable(connection);
TimeUnit.MILLISECONDS.sleep(2 * lockerB.getLockAcquireSleepInterval());
assertFalse("lockerB has the lock", blocked.get());
lockerB.stop();
printLockTable(connection);
}
private void printLockTable(Connection connection) throws IOException {
//((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
}
}