https://issues.apache.org/activemq/browse/AMQ-2696 - last broker sequence id and durable subscribers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@934408 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-04-15 13:52:06 +00:00
parent a0d2282a41
commit 952d036856
7 changed files with 77 additions and 33 deletions

View File

@ -229,8 +229,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
sequenceGenerator.setLastSequenceId(seq); sequenceGenerator.setLastSequenceId(seq);
long brokerSeq = 0; long brokerSeq = 0;
if (seq != 0) { if (seq != 0) {
Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c, seq))); byte[] msg = getAdapter().doGetMessageById(c, seq);
brokerSeq = last.getMessageId().getBrokerSequenceId(); if (msg != null) {
Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
brokerSeq = last.getMessageId().getBrokerSequenceId();
} else {
LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
}
} }
return brokerSeq; return brokerSeq;
} catch (SQLException e) { } catch (SQLException e) {

View File

@ -309,7 +309,7 @@ public class Statements {
public String getDeleteOldMessagesStatement() { public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) { if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID < "
+ "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) " + "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
+ "FROM " + getFullAckTableName() + " WHERE " + "FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName() + getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()

View File

@ -156,8 +156,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
long seq2 = 0; long seq2 = 0;
if (rs.next()) { if (rs.next()) {
seq2 = rs.getLong(1); seq2 = rs.getLong(1);
// if there is no such message, ignore the value
if (this.doGetMessageById(c, seq2) == null) {
seq2 = 0;
}
} }
return Math.max(seq1, seq2); long seq = Math.max(seq1, seq2);
return seq;
} finally { } finally {
close(rs); close(rs);
close(s); close(s);

View File

@ -90,7 +90,7 @@ public class NetworkBrokerDetachTest {
} }
// variants for each store.... // variants for each store....
private void configureBroker(BrokerService broker) throws Exception { protected void configureBroker(BrokerService broker) throws Exception {
//KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter(); //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
//persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest")); //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
//broker.setPersistenceAdapter(persistenceAdapter); //broker.setPersistenceAdapter(persistenceAdapter);

View File

@ -0,0 +1,19 @@
package org.apache.activemq.store.jdbc;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkBrokerDetachTest;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCNetworkBrokerDetachTest extends NetworkBrokerDetachTest {
protected void configureBroker(BrokerService broker) throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName(broker.getBrokerName());
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);
}
}

View File

@ -70,32 +70,23 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
} }
private void createBroker() throws Exception { private void createBroker() throws Exception {
try { broker = new BrokerService();
broker = new BrokerService(); broker.setBrokerName("durable-broker");
broker.setBrokerName("durable-broker"); broker.setDeleteAllMessagesOnStartup(true);
broker.setDeleteAllMessagesOnStartup(true); broker.setPersistenceAdapter(createPersistenceAdapter());
broker.setPersistenceAdapter(createPersistenceAdapter()); broker.setPersistent(true);
broker.setPersistent(true); broker.start();
broker.start();
} catch (Exception e) {
e.printStackTrace();
}
connection = createConnection(); connection = createConnection();
} }
private void createRestartedBroker() throws Exception { private void createRestartedBroker() throws Exception {
try { broker = new BrokerService();
broker = new BrokerService(); broker.setBrokerName("durable-broker");
broker.setBrokerName("durable-broker"); broker.setDeleteAllMessagesOnStartup(false);
broker.setDeleteAllMessagesOnStartup(false); broker.setPersistenceAdapter(createPersistenceAdapter());
broker.setPersistenceAdapter(createPersistenceAdapter()); broker.setPersistent(true);
broker.setPersistent(true); broker.start();
broker.start();
} catch (Exception e) {
e.printStackTrace();
}
connection = createConnection(); connection = createConnection();
} }
@ -231,6 +222,28 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
assertTextMessageEquals("Msg:2", consumer.receive(5000)); assertTextMessageEquals("Msg:2", consumer.receive(5000));
assertNull(consumer.receive(5000)); assertNull(consumer.receive(5000));
} }
public void testDurableSubscriptionBrokerRestart() throws Exception {
// Create the durable sub.
connection.start();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
// Ensure that consumer will receive messages sent before it was created
Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
consumer = session.createDurableSubscriber(topic, "sub1");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Msg:1"));
assertTextMessageEquals("Msg:1", consumer.receive(5000));
// Make sure cleanup kicks in
Thread.sleep(1000);
// Restart the broker.
restartBroker();
}
public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception { public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
/** /**
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
@ -28,11 +28,13 @@ import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport { public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport {
protected PersistenceAdapter createPersistenceAdapter() throws IOException { protected PersistenceAdapter createPersistenceAdapter() throws IOException {
File dataDir = new File("target/test-data/durableJDBC"); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory(); EmbeddedDataSource dataSource = new EmbeddedDataSource();
factory.setDataDirectoryFile(dataDir); dataSource.setDatabaseName("derbyDb");
factory.setUseJournal(false); dataSource.setCreateDatabase("create");
return factory.createPersistenceAdapter(); jdbc.setDataSource(dataSource);
jdbc.setCleanupPeriod(1000); // set up small cleanup period
return jdbc;
} }
} }