merging 923300 and 923694 - https://issues.apache.org/activemq/browse/AMQ-2594 - jdbc order of messages

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@923699 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-03-16 11:46:50 +00:00
parent db9c6f13c5
commit 753646d68b
15 changed files with 477 additions and 52 deletions

View File

@ -171,7 +171,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (cacheEnabled) { if (cacheEnabled) {
cacheEnabled=false; cacheEnabled=false;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size); LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
+ ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
+ " current node seqId: " + node.getMessageId().getBrokerSequenceId());
} }
// sync with store on disabling the cache // sync with store on disabling the cache
if (lastCachedId != null) { if (lastCachedId != null) {

View File

@ -34,11 +34,11 @@ public interface JDBCAdapter {
void doDropTables(TransactionContext c) throws SQLException, IOException; void doDropTables(TransactionContext c) throws SQLException, IOException;
void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException; void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException;
void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException; byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException; String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
@ -58,7 +58,7 @@ public interface JDBCAdapter {
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException; SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException; void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;

View File

@ -44,7 +44,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected final WireFormat wireFormat; protected final WireFormat wireFormat;
protected final JDBCAdapter adapter; protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter; protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1); protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
protected ActiveMQMessageAudit audit; protected ActiveMQMessageAudit audit;
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
@ -67,6 +68,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
return; return;
} }
long sequenceId = persistenceAdapter.getNextSequenceId();
// Serialize the Message.. // Serialize the Message..
byte data[]; byte data[];
try { try {
@ -79,7 +82,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
// Get a connection and insert the message into the DB. // Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
try { try {
adapter.doAddMessage(c, messageId, destination, data, message.getExpiration()); adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
@ -92,7 +95,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
// Get a connection and insert the message into the DB. // Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
try { try {
adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef); adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
@ -102,13 +105,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
} }
public Message getMessage(MessageId messageId) throws IOException { public Message getMessage(MessageId messageId) throws IOException {
long id = messageId.getBrokerSequenceId();
// Get a connection and pull the message out of the DB // Get a connection and pull the message out of the DB
TransactionContext c = persistenceAdapter.getTransactionContext(); TransactionContext c = persistenceAdapter.getTransactionContext();
try { try {
byte data[] = adapter.doGetMessage(c, id); byte data[] = adapter.doGetMessage(c, messageId);
if (data == null) { if (data == null) {
return null; return null;
} }
@ -143,7 +143,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
} }
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
long seq = ack.getLastMessageId().getBrokerSequenceId();
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
// Get a connection and remove the message from the DB // Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
@ -225,14 +226,14 @@ public class JDBCMessageStore extends AbstractMessageStore {
TransactionContext c = persistenceAdapter.getTransactionContext(); TransactionContext c = persistenceAdapter.getTransactionContext();
try { try {
adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned, new JDBCMessageRecoveryListener() { adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) { if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId); msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg); listener.recoverMessage(msg);
lastMessageId.set(sequenceId); lastStoreSequenceId.set(sequenceId);
return true; return true;
} }
return false; return false;
@ -259,13 +260,38 @@ public class JDBCMessageStore extends AbstractMessageStore {
* @see org.apache.activemq.store.MessageStore#resetBatching() * @see org.apache.activemq.store.MessageStore#resetBatching()
*/ */
public void resetBatching() { public void resetBatching() {
lastMessageId.set(-1); if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get());
}
lastStoreSequenceId.set(-1);
} }
@Override @Override
public void setBatch(MessageId messageId) { public void setBatch(MessageId messageId) {
lastMessageId.set(messageId.getBrokerSequenceId()); long storeSequenceId = -1;
try {
storeSequenceId = getStoreSequenceIdForMessageId(messageId);
} catch (IOException ignoredAsAlreadyLogged) {
// reset batch in effect with default -1 value
}
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get());
}
lastStoreSequenceId.set(storeSequenceId);
} }
private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
long result = -1;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
} }

View File

@ -46,6 +46,7 @@ import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -94,6 +95,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected int auditRecoveryDepth = 1024; protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit; protected ActiveMQMessageAudit audit;
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
public JDBCPersistenceAdapter() { public JDBCPersistenceAdapter() {
} }
@ -153,6 +156,28 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
} }
} }
public void initSequenceIdGenerator() {
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) { if (transactionStore != null) {
@ -656,4 +681,10 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
this.auditRecoveryDepth = auditRecoveryDepth; this.auditRecoveryDepth = auditRecoveryDepth;
} }
public long getNextSequenceId() {
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
}
} }

View File

@ -46,10 +46,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
long seq = messageId.getBrokerSequenceId();
// Get a connection and insert the message into the DB. // Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
try { try {
long seq = adapter.getStoreSequenceId(c, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);

View File

@ -134,7 +134,7 @@ public class Statements {
public String getFindMessageStatement() { public String getFindMessageStatement() {
if (findMessageStatement == null) { if (findMessageStatement == null) {
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?"; findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
} }
return findMessageStatement; return findMessageStatement;
} }

View File

@ -23,11 +23,8 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -57,7 +54,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements; protected Statements statements;
protected boolean batchStatments = true; protected boolean batchStatments = true;
private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data); s.setBytes(index, data);
@ -167,7 +163,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration) throws SQLException, IOException { long expiration) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement(); PreparedStatement s = c.getAddMessageStatement();
try { try {
@ -177,7 +173,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
c.setAddMessageStatement(s); c.setAddMessageStatement(s);
} }
} }
s.setLong(1, messageID.getBrokerSequenceId()); s.setLong(1, sequence);
s.setString(2, messageID.getProducerId().toString()); s.setString(2, messageID.getProducerId().toString());
s.setLong(3, messageID.getProducerSequenceId()); s.setLong(3, messageID.getProducerSequenceId());
s.setString(4, destination.getQualifiedName()); s.setString(4, destination.getQualifiedName());
@ -197,7 +193,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination, public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
long expirationTime, String messageRef) throws SQLException, IOException { long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement(); PreparedStatement s = c.getAddMessageStatement();
try { try {
@ -225,7 +221,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
@ -243,12 +239,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException { public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
try { try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq); s.setString(1, id.getProducerId().toString());
s.setLong(2, id.getProducerSequenceId());
rs = s.executeQuery(); rs = s.executeQuery();
if (!rs.next()) { if (!rs.next()) {
return null; return null;

View File

@ -67,18 +67,20 @@ public class LocalTransaction extends Transaction {
setState(Transaction.FINISHED_STATE); setState(Transaction.FINISHED_STATE);
context.getTransactions().remove(xid); context.getTransactions().remove(xid);
transactionStore.commit(getTransactionId(), false); synchronized (transactionStore) {
transactionStore.commit(getTransactionId(), false);
try { try {
fireAfterCommit(); fireAfterCommit();
} catch (Throwable e) { } catch (Throwable e) {
// I guess this could happen. Post commit task failed // I guess this could happen. Post commit task failed
// to execute properly. // to execute properly.
LOG.warn("POST COMMIT FAILED: ", e); LOG.warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED"); XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR; xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e); xae.initCause(e);
throw xae; throw xae;
}
} }
} }
@ -90,16 +92,18 @@ public class LocalTransaction extends Transaction {
} }
setState(Transaction.FINISHED_STATE); setState(Transaction.FINISHED_STATE);
context.getTransactions().remove(xid); context.getTransactions().remove(xid);
transactionStore.rollback(getTransactionId()); synchronized (transactionStore) {
transactionStore.rollback(getTransactionId());
try { try {
fireAfterRollback(); fireAfterRollback();
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("POST ROLLBACK FAILED: ", e); LOG.warn("POST ROLLBACK FAILED: ", e);
XAException xae = new XAException("POST ROLLBACK FAILED"); XAException xae = new XAException("POST ROLLBACK FAILED");
xae.errorCode = XAException.XAER_RMERR; xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e); xae.initCause(e);
throw xae; throw xae;
}
} }
} }

View File

@ -135,6 +135,7 @@ public class NegativeQueueTest extends TestCase {
public void testWithNoPrefetch() throws Exception{ public void testWithNoPrefetch() throws Exception{
PREFETCH_SIZE = 1; PREFETCH_SIZE = 1;
NUM_CONSUMERS = 20;
blastAndConsume(); blastAndConsume();
} }
@ -192,7 +193,7 @@ public class NegativeQueueTest extends TestCase {
consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1)); consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
} }
latch1.await(300000, TimeUnit.MILLISECONDS); latch1.await(200000, TimeUnit.MILLISECONDS);
if(DEBUG){ if(DEBUG){
System.out.println(""); System.out.println("");
System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize()); System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
@ -295,6 +296,11 @@ public class NegativeQueueTest extends TestCase {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
// disable the cache to be sure setBatch is the problem
// will get lots of duplicates
// policy.setUseCache(false);
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap); answer.setDestinationPolicy(pMap);

View File

@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
@ -33,6 +34,7 @@ import org.apache.activemq.command.MessageId;
abstract public class PersistenceAdapterTestSupport extends TestCase { abstract public class PersistenceAdapterTestSupport extends TestCase {
protected PersistenceAdapter pa; protected PersistenceAdapter pa;
protected BrokerService brokerService = new BrokerService();
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
// https://issues.apache.org/activemq/browse/AMQ-2594
public abstract class StoreOrderTest {
private static final Log LOG = LogFactory.getLog(StoreOrderTest.class);
protected BrokerService broker;
private ActiveMQConnection connection;
public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");
protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;
protected void dumpMessages() throws Exception {}
public class TransactedSend implements Runnable {
private CountDownLatch readyForCommit;
private CountDownLatch firstDone;
private boolean first;
private Session session;
private MessageProducer producer;
public TransactedSend(CountDownLatch readyForCommit,
CountDownLatch firstDone, boolean b) throws Exception {
this.readyForCommit = readyForCommit;
this.firstDone = firstDone;
this.first = b;
session = connection.createSession(true, Session.SESSION_TRANSACTED);
producer = session.createProducer(destination);
}
public void run() {
try {
if (!first) {
firstDone.await(30, TimeUnit.SECONDS);
}
producer.send(session.createTextMessage(first ? "first" : "second"));
if (first) {
firstDone.countDown();
}
readyForCommit.countDown();
} catch (Exception e) {
e.printStackTrace();
fail("unexpected ex on run " + e);
}
}
public void commit() throws Exception {
session.commit();
session.close();
}
}
@Before
public void setup() throws Exception {
broker = createBroker();
initConnection();
}
public void initConnection() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setWatchTopicAdvisories(false);
connection.start();
}
@After
public void stopBroker() throws Exception {
if (connection != null) {
connection.close();
}
if (broker != null) {
broker.stop();
}
}
@Test
public void validateUnorderedTxCommit() throws Exception {
Executor executor = Executors.newCachedThreadPool();
CountDownLatch readyForCommit = new CountDownLatch(2);
CountDownLatch firstDone = new CountDownLatch(1);
TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
executor.execute(first);
executor.execute(second);
assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS));
LOG.info("commit out of order");
// send interleaved so sequence id at time of commit could be reversed
second.commit();
// force usage over the limit before second commit to flush cache
enqueueOneMessage();
// can get lost in the cursor as it is behind the last sequenceId that was cached
first.commit();
LOG.info("send/commit done..");
dumpMessages();
String received1, received2, received3 = null;
if (true) {
LOG.info("receive and rollback...");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
received1 = receive(session);
received2 = receive(session);
received3 = receive(session);
assertEquals("second", received1);
assertEquals("middle", received2);
assertEquals("first", received3);
session.rollback();
session.close();
}
LOG.info("restart broker");
stopBroker();
broker = createRestartedBroker();
initConnection();
if (true) {
LOG.info("receive and rollback after restart...");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
received1 = receive(session);
received2 = receive(session);
received3 = receive(session);
assertEquals("second", received1);
assertEquals("middle", received2);
assertEquals("first", received3);
session.rollback();
session.close();
}
LOG.info("receive and ack each message");
received1 = receiveOne();
received2 = receiveOne();
received3 = receiveOne();
assertEquals("second", received1);
assertEquals("middle", received2);
assertEquals("first", received3);
}
private void enqueueOneMessage() throws Exception {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("middle"));
session.commit();
session.close();
}
private String receiveOne() throws Exception {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
String received = receive(session);
session.commit();
session.close();
return received;
}
private String receive(Session session) throws Exception {
MessageConsumer consumer = session.createConsumer(destination);
String result = null;
TextMessage message = (TextMessage) consumer.receive(5000);
if (message != null) {
LOG.info("got message: " + message);
result = message.getText();
}
consumer.close();
return result;
}
protected BrokerService createBroker() throws Exception {
boolean deleteMessagesOnStartup = true;
return startBroker(deleteMessagesOnStartup);
}
protected BrokerService createRestartedBroker() throws Exception {
boolean deleteMessagesOnStartup = false;
return startBroker(deleteMessagesOnStartup);
}
protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception {
BrokerService newBroker = new BrokerService();
configureBroker(newBroker);
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
newBroker.start();
return newBroker;
}
protected void configureBroker(BrokerService brokerService) throws Exception {
setPersistentAdapter(brokerService);
brokerService.setAdvisorySupport(false);
PolicyMap map = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setMemoryLimit(1024*3);
defaultEntry.setCursorMemoryHighWaterMark(68);
map.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(map);
}
}

View File

@ -28,6 +28,8 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException { protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
brokerService.setPersistenceAdapter(jdbc);
jdbc.setBrokerService(brokerService);
EmbeddedDataSource dataSource = new EmbeddedDataSource(); EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb"); dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create"); dataSource.setCreateDatabase("create");

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.StoreOrderTest;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
// https://issues.apache.org/activemq/browse/AMQ-2594
public class JDBCStoreOrderTest extends StoreOrderTest {
private static final Log LOG = LogFactory.getLog(JDBCStoreOrderTest.class);
@Override
protected void dumpMessages() throws Exception {
WireFormat wireFormat = new OpenWireFormat();
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
while(result.next()) {
long id = result.getLong(1);
Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
}
statement.close();
conn.close();
}
@Override
protected void setPersistentAdapter(BrokerService brokerService)
throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
brokerService.setPersistenceAdapter(jdbc);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.StoreOrderTest;
// https://issues.apache.org/activemq/browse/AMQ-2594
public class KahaDBStoreOrderTest extends StoreOrderTest {
@Override
protected void setPersistentAdapter(BrokerService brokerService)
throws Exception {
KahaDBStore kaha = new KahaDBStore();
File directory = new File("target/activemq-data/kahadb/storeOrder");
kaha.setDirectory(directory);
brokerService.setPersistenceAdapter(kaha);
}
}

View File

@ -21,6 +21,9 @@
log4j.rootLogger=INFO, out, stdout log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG #log4j.logger.org.apache.activemq=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
# CONSOLE appender not used by default # CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender