mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2594 - jdbc and broker seq id - intial commit
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@923300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1c427e0a3
commit
24a7626b06
|
@ -171,7 +171,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
if (cacheEnabled) {
|
if (cacheEnabled) {
|
||||||
cacheEnabled=false;
|
cacheEnabled=false;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size);
|
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
|
||||||
|
+ ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
|
||||||
|
+ " current node seqId: " + node.getMessageId().getBrokerSequenceId());
|
||||||
}
|
}
|
||||||
// sync with store on disabling the cache
|
// sync with store on disabling the cache
|
||||||
if (lastCachedId != null) {
|
if (lastCachedId != null) {
|
||||||
|
|
|
@ -34,11 +34,11 @@ public interface JDBCAdapter {
|
||||||
|
|
||||||
void doDropTables(TransactionContext c) throws SQLException, IOException;
|
void doDropTables(TransactionContext c) throws SQLException, IOException;
|
||||||
|
|
||||||
void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException;
|
void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException;
|
||||||
|
|
||||||
void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
|
void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
|
||||||
|
|
||||||
byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
|
byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
|
||||||
|
|
||||||
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
|
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public interface JDBCAdapter {
|
||||||
|
|
||||||
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
|
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
|
||||||
|
|
||||||
long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
|
long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
|
||||||
|
|
||||||
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
|
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
|
||||||
|
|
||||||
|
|
|
@ -44,9 +44,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
protected final WireFormat wireFormat;
|
protected final WireFormat wireFormat;
|
||||||
protected final JDBCAdapter adapter;
|
protected final JDBCAdapter adapter;
|
||||||
protected final JDBCPersistenceAdapter persistenceAdapter;
|
protected final JDBCPersistenceAdapter persistenceAdapter;
|
||||||
protected AtomicLong lastMessageId = new AtomicLong(-1);
|
protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
|
||||||
protected ActiveMQMessageAudit audit;
|
|
||||||
|
|
||||||
|
protected ActiveMQMessageAudit audit;
|
||||||
|
|
||||||
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
|
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
|
||||||
super(destination);
|
super(destination);
|
||||||
this.persistenceAdapter = persistenceAdapter;
|
this.persistenceAdapter = persistenceAdapter;
|
||||||
|
@ -67,6 +68,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long sequenceId = persistenceAdapter.getNextSequenceId();
|
||||||
|
|
||||||
// Serialize the Message..
|
// Serialize the Message..
|
||||||
byte data[];
|
byte data[];
|
||||||
try {
|
try {
|
||||||
|
@ -78,8 +81,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
|
|
||||||
// Get a connection and insert the message into the DB.
|
// Get a connection and insert the message into the DB.
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
try {
|
try {
|
||||||
adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
|
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||||
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
|
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
|
||||||
|
@ -92,7 +95,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
// Get a connection and insert the message into the DB.
|
// Get a connection and insert the message into the DB.
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
try {
|
try {
|
||||||
adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
|
adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||||
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
|
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
|
||||||
|
@ -102,13 +105,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Message getMessage(MessageId messageId) throws IOException {
|
public Message getMessage(MessageId messageId) throws IOException {
|
||||||
|
|
||||||
long id = messageId.getBrokerSequenceId();
|
|
||||||
|
|
||||||
// Get a connection and pull the message out of the DB
|
// Get a connection and pull the message out of the DB
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||||
try {
|
try {
|
||||||
byte data[] = adapter.doGetMessage(c, id);
|
byte data[] = adapter.doGetMessage(c, messageId);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
||||||
long seq = ack.getLastMessageId().getBrokerSequenceId();
|
|
||||||
|
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
|
||||||
|
|
||||||
// Get a connection and remove the message from the DB
|
// Get a connection and remove the message from the DB
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
|
@ -225,14 +226,14 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned, new JDBCMessageRecoveryListener() {
|
adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
|
||||||
|
|
||||||
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
||||||
if (listener.hasSpace()) {
|
if (listener.hasSpace()) {
|
||||||
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
||||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||||
listener.recoverMessage(msg);
|
listener.recoverMessage(msg);
|
||||||
lastMessageId.set(sequenceId);
|
lastStoreSequenceId.set(sequenceId);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -259,13 +260,38 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
* @see org.apache.activemq.store.MessageStore#resetBatching()
|
* @see org.apache.activemq.store.MessageStore#resetBatching()
|
||||||
*/
|
*/
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
lastMessageId.set(-1);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get());
|
||||||
|
}
|
||||||
|
lastStoreSequenceId.set(-1);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBatch(MessageId messageId) {
|
public void setBatch(MessageId messageId) {
|
||||||
lastMessageId.set(messageId.getBrokerSequenceId());
|
long storeSequenceId = -1;
|
||||||
|
try {
|
||||||
|
storeSequenceId = getStoreSequenceIdForMessageId(messageId);
|
||||||
|
} catch (IOException ignoredAsAlreadyLogged) {
|
||||||
|
// reset batch in effect with default -1 value
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get());
|
||||||
|
}
|
||||||
|
lastStoreSequenceId.set(storeSequenceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
|
||||||
|
long result = -1;
|
||||||
|
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||||
|
try {
|
||||||
|
result = adapter.getStoreSequenceId(c, messageId);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||||
|
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
|
||||||
|
} finally {
|
||||||
|
c.close();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.store.memory.MemoryTransactionStore;
|
||||||
import org.apache.activemq.usage.SystemUsage;
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
import org.apache.activemq.util.FactoryFinder;
|
import org.apache.activemq.util.FactoryFinder;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -93,6 +94,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
protected boolean enableAudit=true;
|
protected boolean enableAudit=true;
|
||||||
protected int auditRecoveryDepth = 1024;
|
protected int auditRecoveryDepth = 1024;
|
||||||
protected ActiveMQMessageAudit audit;
|
protected ActiveMQMessageAudit audit;
|
||||||
|
|
||||||
|
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
|
||||||
|
|
||||||
public JDBCPersistenceAdapter() {
|
public JDBCPersistenceAdapter() {
|
||||||
}
|
}
|
||||||
|
@ -152,6 +155,28 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initSequenceIdGenerator() {
|
||||||
|
TransactionContext c = null;
|
||||||
|
try {
|
||||||
|
c = getTransactionContext();
|
||||||
|
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
|
||||||
|
public void messageId(MessageId id) {
|
||||||
|
audit.isDuplicate(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
|
||||||
|
} finally {
|
||||||
|
if (c != null) {
|
||||||
|
try {
|
||||||
|
c.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
||||||
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
|
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
|
||||||
|
@ -655,5 +680,11 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
public void setAuditRecoveryDepth(int auditRecoveryDepth) {
|
public void setAuditRecoveryDepth(int auditRecoveryDepth) {
|
||||||
this.auditRecoveryDepth = auditRecoveryDepth;
|
this.auditRecoveryDepth = auditRecoveryDepth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getNextSequenceId() {
|
||||||
|
synchronized(sequenceGenerator) {
|
||||||
|
return sequenceGenerator.getNextSequenceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,10 +46,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
||||||
}
|
}
|
||||||
|
|
||||||
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
|
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
|
||||||
long seq = messageId.getBrokerSequenceId();
|
|
||||||
// Get a connection and insert the message into the DB.
|
// Get a connection and insert the message into the DB.
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
try {
|
try {
|
||||||
|
long seq = adapter.getStoreSequenceId(c, messageId);
|
||||||
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
|
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||||
|
|
|
@ -134,7 +134,7 @@ public class Statements {
|
||||||
|
|
||||||
public String getFindMessageStatement() {
|
public String getFindMessageStatement() {
|
||||||
if (findMessageStatement == null) {
|
if (findMessageStatement == null) {
|
||||||
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?";
|
findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
|
||||||
}
|
}
|
||||||
return findMessageStatement;
|
return findMessageStatement;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,13 +22,9 @@ import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
|
@ -59,7 +55,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
|
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
|
||||||
protected Statements statements;
|
protected Statements statements;
|
||||||
protected boolean batchStatments = true;
|
protected boolean batchStatments = true;
|
||||||
private Set<Long> lastRecoveredMessagesIds = Collections.synchronizedSet(new TreeSet<Long>());
|
|
||||||
|
|
||||||
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
|
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
|
||||||
s.setBytes(index, data);
|
s.setBytes(index, data);
|
||||||
|
@ -169,7 +164,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
|
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
|
||||||
long expiration) throws SQLException, IOException {
|
long expiration) throws SQLException, IOException {
|
||||||
PreparedStatement s = c.getAddMessageStatement();
|
PreparedStatement s = c.getAddMessageStatement();
|
||||||
try {
|
try {
|
||||||
|
@ -179,7 +174,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
c.setAddMessageStatement(s);
|
c.setAddMessageStatement(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.setLong(1, messageID.getBrokerSequenceId());
|
s.setLong(1, sequence);
|
||||||
s.setString(2, messageID.getProducerId().toString());
|
s.setString(2, messageID.getProducerId().toString());
|
||||||
s.setLong(3, messageID.getProducerSequenceId());
|
s.setLong(3, messageID.getProducerSequenceId());
|
||||||
s.setString(4, destination.getQualifiedName());
|
s.setString(4, destination.getQualifiedName());
|
||||||
|
@ -199,7 +194,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
|
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
|
||||||
long expirationTime, String messageRef) throws SQLException, IOException {
|
long expirationTime, String messageRef) throws SQLException, IOException {
|
||||||
PreparedStatement s = c.getAddMessageStatement();
|
PreparedStatement s = c.getAddMessageStatement();
|
||||||
try {
|
try {
|
||||||
|
@ -227,7 +222,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
|
public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
|
||||||
PreparedStatement s = null;
|
PreparedStatement s = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
|
@ -245,12 +240,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException {
|
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
|
||||||
PreparedStatement s = null;
|
PreparedStatement s = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
try {
|
try {
|
||||||
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
|
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
|
||||||
s.setLong(1, seq);
|
s.setString(1, id.getProducerId().toString());
|
||||||
|
s.setLong(2, id.getProducerSequenceId());
|
||||||
rs = s.executeQuery();
|
rs = s.executeQuery();
|
||||||
if (!rs.next()) {
|
if (!rs.next()) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -135,6 +135,7 @@ public class NegativeQueueTest extends TestCase {
|
||||||
|
|
||||||
public void testWithNoPrefetch() throws Exception{
|
public void testWithNoPrefetch() throws Exception{
|
||||||
PREFETCH_SIZE = 1;
|
PREFETCH_SIZE = 1;
|
||||||
|
NUM_CONSUMERS = 20;
|
||||||
blastAndConsume();
|
blastAndConsume();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +193,7 @@ public class NegativeQueueTest extends TestCase {
|
||||||
consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
|
consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
|
||||||
}
|
}
|
||||||
|
|
||||||
latch1.await(300000, TimeUnit.MILLISECONDS);
|
latch1.await(200000, TimeUnit.MILLISECONDS);
|
||||||
if(DEBUG){
|
if(DEBUG){
|
||||||
System.out.println("");
|
System.out.println("");
|
||||||
System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
|
System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
|
||||||
|
@ -295,6 +296,11 @@ public class NegativeQueueTest extends TestCase {
|
||||||
PolicyEntry policy = new PolicyEntry();
|
PolicyEntry policy = new PolicyEntry();
|
||||||
policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
|
policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
|
||||||
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
|
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
|
||||||
|
|
||||||
|
// disable the cache to be sure setBatch is the problem
|
||||||
|
// will get lots of duplicates
|
||||||
|
// policy.setUseCache(false);
|
||||||
|
|
||||||
PolicyMap pMap = new PolicyMap();
|
PolicyMap pMap = new PolicyMap();
|
||||||
pMap.setDefaultEntry(policy);
|
pMap.setDefaultEntry(policy);
|
||||||
answer.setDestinationPolicy(pMap);
|
answer.setDestinationPolicy(pMap);
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
@ -33,6 +34,7 @@ import org.apache.activemq.command.MessageId;
|
||||||
abstract public class PersistenceAdapterTestSupport extends TestCase {
|
abstract public class PersistenceAdapterTestSupport extends TestCase {
|
||||||
|
|
||||||
protected PersistenceAdapter pa;
|
protected PersistenceAdapter pa;
|
||||||
|
protected BrokerService brokerService = new BrokerService();
|
||||||
|
|
||||||
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
|
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,254 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
// https://issues.apache.org/activemq/browse/AMQ-2594
|
||||||
|
public abstract class StoreOrderTest {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(StoreOrderTest.class);
|
||||||
|
|
||||||
|
protected BrokerService broker;
|
||||||
|
private ActiveMQConnection connection;
|
||||||
|
public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");
|
||||||
|
|
||||||
|
protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;
|
||||||
|
protected void dumpMessages() throws Exception {}
|
||||||
|
|
||||||
|
public class TransactedSend implements Runnable {
|
||||||
|
|
||||||
|
private CountDownLatch readyForCommit;
|
||||||
|
private CountDownLatch firstDone;
|
||||||
|
private boolean first;
|
||||||
|
private Session session;
|
||||||
|
private MessageProducer producer;
|
||||||
|
|
||||||
|
public TransactedSend(CountDownLatch readyForCommit,
|
||||||
|
CountDownLatch firstDone, boolean b) throws Exception {
|
||||||
|
this.readyForCommit = readyForCommit;
|
||||||
|
this.firstDone = firstDone;
|
||||||
|
this.first = b;
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
producer = session.createProducer(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
if (!first) {
|
||||||
|
firstDone.await(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
producer.send(session.createTextMessage(first ? "first" : "second"));
|
||||||
|
if (first) {
|
||||||
|
firstDone.countDown();
|
||||||
|
}
|
||||||
|
readyForCommit.countDown();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
fail("unexpected ex on run " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void commit() throws Exception {
|
||||||
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
broker = createBroker();
|
||||||
|
initConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initConnection() throws Exception {
|
||||||
|
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
|
||||||
|
connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||||
|
connection.setWatchTopicAdvisories(false);
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
if (broker != null) {
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void validateUnorderedTxCommit() throws Exception {
|
||||||
|
|
||||||
|
Executor executor = Executors.newCachedThreadPool();
|
||||||
|
CountDownLatch readyForCommit = new CountDownLatch(2);
|
||||||
|
CountDownLatch firstDone = new CountDownLatch(1);
|
||||||
|
|
||||||
|
TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
|
||||||
|
TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
|
||||||
|
executor.execute(first);
|
||||||
|
executor.execute(second);
|
||||||
|
|
||||||
|
assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
LOG.info("commit out of order");
|
||||||
|
// send interleaved so sequence id at time of commit could be reversed
|
||||||
|
second.commit();
|
||||||
|
|
||||||
|
// force usage over the limit before second commit to flush cache
|
||||||
|
enqueueOneMessage();
|
||||||
|
|
||||||
|
// can get lost in the cursor as it is behind the last sequenceId that was cached
|
||||||
|
first.commit();
|
||||||
|
|
||||||
|
LOG.info("send/commit done..");
|
||||||
|
|
||||||
|
dumpMessages();
|
||||||
|
|
||||||
|
String received1, received2, received3 = null;
|
||||||
|
if (true) {
|
||||||
|
LOG.info("receive and rollback...");
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
received1 = receive(session);
|
||||||
|
received2 = receive(session);
|
||||||
|
received3 = receive(session);
|
||||||
|
|
||||||
|
assertEquals("second", received1);
|
||||||
|
assertEquals("middle", received2);
|
||||||
|
assertEquals("first", received3);
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
LOG.info("restart broker");
|
||||||
|
stopBroker();
|
||||||
|
broker = createRestartedBroker();
|
||||||
|
initConnection();
|
||||||
|
|
||||||
|
if (true) {
|
||||||
|
LOG.info("receive and rollback after restart...");
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
received1 = receive(session);
|
||||||
|
received2 = receive(session);
|
||||||
|
received3 = receive(session);
|
||||||
|
assertEquals("second", received1);
|
||||||
|
assertEquals("middle", received2);
|
||||||
|
assertEquals("first", received3);
|
||||||
|
session.rollback();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("receive and ack each message");
|
||||||
|
received1 = receiveOne();
|
||||||
|
received2 = receiveOne();
|
||||||
|
received3 = receiveOne();
|
||||||
|
|
||||||
|
assertEquals("second", received1);
|
||||||
|
assertEquals("middle", received2);
|
||||||
|
assertEquals("first", received3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enqueueOneMessage() throws Exception {
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
producer.send(session.createTextMessage("middle"));
|
||||||
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private String receiveOne() throws Exception {
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
String received = receive(session);
|
||||||
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
return received;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String receive(Session session) throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
String result = null;
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||||
|
if (message != null) {
|
||||||
|
LOG.info("got message: " + message);
|
||||||
|
result = message.getText();
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
boolean deleteMessagesOnStartup = true;
|
||||||
|
return startBroker(deleteMessagesOnStartup);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createRestartedBroker() throws Exception {
|
||||||
|
boolean deleteMessagesOnStartup = false;
|
||||||
|
return startBroker(deleteMessagesOnStartup);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception {
|
||||||
|
BrokerService newBroker = new BrokerService();
|
||||||
|
configureBroker(newBroker);
|
||||||
|
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
|
||||||
|
newBroker.start();
|
||||||
|
return newBroker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void configureBroker(BrokerService brokerService) throws Exception {
|
||||||
|
setPersistentAdapter(brokerService);
|
||||||
|
brokerService.setAdvisorySupport(false);
|
||||||
|
|
||||||
|
PolicyMap map = new PolicyMap();
|
||||||
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
|
defaultEntry.setMemoryLimit(1024*3);
|
||||||
|
defaultEntry.setCursorMemoryHighWaterMark(68);
|
||||||
|
map.setDefaultEntry(defaultEntry);
|
||||||
|
brokerService.setDestinationPolicy(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -28,6 +28,9 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
|
||||||
|
|
||||||
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
|
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
|
||||||
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
|
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
|
||||||
|
brokerService.setSchedulerSupport(false);
|
||||||
|
brokerService.setPersistenceAdapter(jdbc);
|
||||||
|
jdbc.setBrokerService(brokerService);
|
||||||
EmbeddedDataSource dataSource = new EmbeddedDataSource();
|
EmbeddedDataSource dataSource = new EmbeddedDataSource();
|
||||||
dataSource.setDatabaseName("derbyDb");
|
dataSource.setDatabaseName("derbyDb");
|
||||||
dataSource.setCreateDatabase("create");
|
dataSource.setCreateDatabase("create");
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
|
import org.apache.activemq.store.StoreOrderTest;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||||
|
|
||||||
|
// https://issues.apache.org/activemq/browse/AMQ-2594
|
||||||
|
public class JDBCStoreOrderTest extends StoreOrderTest {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(JDBCStoreOrderTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void dumpMessages() throws Exception {
|
||||||
|
WireFormat wireFormat = new OpenWireFormat();
|
||||||
|
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
|
||||||
|
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
|
||||||
|
ResultSet result = statement.executeQuery();
|
||||||
|
while(result.next()) {
|
||||||
|
long id = result.getLong(1);
|
||||||
|
Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
|
||||||
|
LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
|
||||||
|
}
|
||||||
|
statement.close();
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setPersistentAdapter(BrokerService brokerService)
|
||||||
|
throws Exception {
|
||||||
|
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
|
||||||
|
EmbeddedDataSource dataSource = new EmbeddedDataSource();
|
||||||
|
dataSource.setDatabaseName("derbyDb");
|
||||||
|
dataSource.setCreateDatabase("create");
|
||||||
|
jdbc.setDataSource(dataSource);
|
||||||
|
brokerService.setPersistenceAdapter(jdbc);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.store.StoreOrderTest;
|
||||||
|
|
||||||
|
// https://issues.apache.org/activemq/browse/AMQ-2594
|
||||||
|
public class KahaDBStoreOrderTest extends StoreOrderTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setPersistentAdapter(BrokerService brokerService)
|
||||||
|
throws Exception {
|
||||||
|
KahaDBStore kaha = new KahaDBStore();
|
||||||
|
File directory = new File("target/activemq-data/kahadb/storeOrder");
|
||||||
|
kaha.setDirectory(directory);
|
||||||
|
brokerService.setPersistenceAdapter(kaha);
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,9 @@
|
||||||
log4j.rootLogger=INFO, out, stdout
|
log4j.rootLogger=INFO, out, stdout
|
||||||
|
|
||||||
#log4j.logger.org.apache.activemq=DEBUG
|
#log4j.logger.org.apache.activemq=DEBUG
|
||||||
|
#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
|
||||||
|
log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
|
||||||
|
log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
|
||||||
|
|
||||||
# CONSOLE appender not used by default
|
# CONSOLE appender not used by default
|
||||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
|
Loading…
Reference in New Issue