mirror of https://github.com/apache/activemq.git
fix for AMQ-1885, allow jdbc slave broker to out live a db outage
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@685988 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f405a7e80d
commit
5a81bbb375
|
@ -176,6 +176,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
|||
LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
|
||||
} else {
|
||||
service.start();
|
||||
if (lockKeepAlivePeriod > 0) {
|
||||
getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
|
||||
public void run() {
|
||||
databaseLockKeepAlive();
|
||||
}
|
||||
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (brokerService != null) {
|
||||
brokerService.getBroker().nowMasterBroker();
|
||||
}
|
||||
|
@ -258,13 +265,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
|||
public DatabaseLocker getDatabaseLocker() throws IOException {
|
||||
if (databaseLocker == null) {
|
||||
databaseLocker = createDatabaseLocker();
|
||||
if (lockKeepAlivePeriod > 0) {
|
||||
getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
|
||||
public void run() {
|
||||
databaseLockKeepAlive();
|
||||
}
|
||||
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
return databaseLocker;
|
||||
}
|
||||
|
|
|
@ -122,6 +122,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
// This will fail usually since the tables will be
|
||||
// created already.
|
||||
try {
|
||||
LOG.debug("Executing SQL: " + dropStatments[i]);
|
||||
s.execute(dropStatments[i]);
|
||||
} catch (SQLException e) {
|
||||
LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
|
||||
|
@ -187,7 +188,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
} finally {
|
||||
if (!batchStatments) {
|
||||
s.close();
|
||||
if (s!=null) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,4 +14,4 @@
|
|||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
class=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter
|
||||
class=org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter
|
||||
|
|
|
@ -85,7 +85,6 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
|
|||
*/
|
||||
public void testSendReceive() throws Exception {
|
||||
messages.clear();
|
||||
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
Message message = session.createTextMessage(data[i]);
|
||||
message.setStringProperty("stringProperty", data[i]);
|
||||
|
@ -97,7 +96,7 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
|
|||
}
|
||||
}
|
||||
|
||||
producer.send(producerDestination, message);
|
||||
sendToProducer(producer, producerDestination, message);
|
||||
messageSent();
|
||||
}
|
||||
|
||||
|
@ -105,6 +104,18 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
|
|||
LOG.info("" + data.length + " messages(s) received, closing down connections");
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message to a destination using the supplied producer
|
||||
* @param producer
|
||||
* @param producerDestination
|
||||
* @param message
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected void sendToProducer(MessageProducer producer,
|
||||
Destination producerDestination, Message message) throws JMSException {
|
||||
producer.send(producerDestination, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts messages are received.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package org.apache.activemq.broker.ft;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||
|
||||
public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
|
||||
private static final transient Log LOG = LogFactory.getLog(DbRestartJDBCQueueMasterSlaveTest.class);
|
||||
|
||||
protected void messageSent() throws Exception {
|
||||
if (++inflightMessageCount == failureCount) {
|
||||
final EmbeddedDataSource ds = getExistingDataSource();
|
||||
ds.setShutdownDatabase("shutdown");
|
||||
LOG.info("DB STOPPED!@!!!!");
|
||||
|
||||
Thread dbRestartThread = new Thread("db-re-start-thread") {
|
||||
public void run() {
|
||||
LOG.info("Waiting for master broker to Stop");
|
||||
master.waitUntilStopped();
|
||||
ds.setShutdownDatabase("false");
|
||||
LOG.info("DB RESTARTED!@!!!!");
|
||||
}
|
||||
};
|
||||
dbRestartThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendToProducer(MessageProducer producer,
|
||||
Destination producerDestination, Message message) throws JMSException {
|
||||
{
|
||||
// do some retries as db failures filter back to the client until broker sees
|
||||
// db lock failure and shuts down
|
||||
boolean sent = false;
|
||||
do {
|
||||
try {
|
||||
producer.send(producerDestination, message);
|
||||
sent = true;
|
||||
} catch (JMSException e) {
|
||||
LOG.info("Exception on producer send:", e);
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
} while(!sent);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
package org.apache.activemq.broker.ft;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.jdbc.DataSourceSupport;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||
|
||||
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
|
||||
protected EmbeddedDataSource sharedDs;
|
||||
protected String MASTER_URL = "tcp://localhost:62001";
|
||||
protected String SLAVE_URL = "tcp://localhost:62002";
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
// startup db
|
||||
sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void createMaster() throws Exception {
|
||||
master = new BrokerService();
|
||||
master.addConnector(MASTER_URL);
|
||||
master.setUseJmx(false);
|
||||
master.setPersistent(true);
|
||||
master.setDeleteAllMessagesOnStartup(true);
|
||||
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
|
||||
persistenceAdapter.setDataSource(getExistingDataSource());
|
||||
persistenceAdapter.setLockKeepAlivePeriod(500);
|
||||
master.setPersistenceAdapter(persistenceAdapter);
|
||||
master.start();
|
||||
}
|
||||
|
||||
protected void createSlave() throws Exception {
|
||||
// use a separate thread as the slave will block waiting for
|
||||
// the exclusive db lock
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.addConnector(SLAVE_URL);
|
||||
// no need for broker.setMasterConnectorURI(masterConnectorURI)
|
||||
// as the db lock provides the slave/master initialisation
|
||||
broker.setUseJmx(false);
|
||||
broker.setPersistent(true);
|
||||
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
|
||||
persistenceAdapter.setDataSource(getExistingDataSource());
|
||||
persistenceAdapter.setCreateTablesOnStartup(false);
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
broker.start();
|
||||
slave.set(broker);
|
||||
slaveStarted.countDown();
|
||||
} catch (Exception e) {
|
||||
fail("failed to start slave broker, reason:" + e);
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
}
|
||||
|
||||
protected EmbeddedDataSource getExistingDataSource() throws Exception {
|
||||
return sharedDs;
|
||||
}
|
||||
}
|
|
@ -82,8 +82,7 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
|
|||
}
|
||||
|
||||
protected void messageSent() throws Exception {
|
||||
if (++inflightMessageCount >= failureCount) {
|
||||
inflightMessageCount = 0;
|
||||
if (++inflightMessageCount == failureCount) {
|
||||
Thread.sleep(1000);
|
||||
LOG.error("MASTER STOPPED!@!!!!");
|
||||
master.stop();
|
||||
|
|
Loading…
Reference in New Issue