diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 37a07a8787..54ab84d863 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -82,5 +82,5 @@ public interface JDBCAdapter { long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; - void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; + void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java index 4b3e569ed8..a1dfd689db 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java @@ -19,5 +19,5 @@ package org.apache.activemq.store.jdbc; import org.apache.activemq.command.MessageId; public interface JDBCMessageIdScanListener { - boolean messageId(MessageId id); + void messageId(MessageId id); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index f342d2e7a2..da7e7176f9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -53,26 +53,6 @@ public class JDBCMessageStore extends AbstractMessageStore { this.adapter = adapter; this.wireFormat = wireFormat; this.audit = audit; - initAudit(); - } - - /* - * revisit: This can be destination agnostic and back in the jdbc persistence adapter start - */ - public void initAudit() { - if (audit != null) { - try { - TransactionContext c = persistenceAdapter.getTransactionContext(null); - adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() { - public boolean messageId(MessageId id) { - audit.isDuplicate(id); - return true; - } - }); - } catch (Exception e) { - LOG.error("Failed to reload store message audit for queue store " + destination); - } - } } public void addMessage(ConnectionContext context, Message message) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index c2f7f064c6..318807947d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -35,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -90,6 +91,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected int maxProducersToAudit=1024; protected int maxAuditDepth=1000; protected boolean enableAudit=true; + protected int auditRecoveryDepth = 1024; protected ActiveMQMessageAudit audit; public JDBCPersistenceAdapter() { @@ -126,15 +128,33 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist return Collections.EMPTY_SET; } - protected ActiveMQMessageAudit createMessageAudit() { - if (enableAudit && audit == null) { - audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); + protected void createMessageAudit() { + if (enableAudit && audit == null) { + audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); + TransactionContext c = null; + + try { + c = getTransactionContext(); + getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { + public void messageId(MessageId id) { + audit.isDuplicate(id); + } + }); + } catch (Exception e) { + LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); + } finally { + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + } + } + } } - return audit; } public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit()); + MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); if (transactionStore != null) { rc = transactionStore.proxy(rc); } @@ -142,7 +162,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist } public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit()); + TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); if (transactionStore != null) { rc = transactionStore.proxy(rc); } @@ -234,6 +254,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist } }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS); } + + createMessageAudit(); } public synchronized void stop() throws Exception { @@ -625,6 +647,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist public void setEnableAudit(boolean enableAudit) { this.enableAudit = enableAudit; } - + + public int getAuditRecoveryDepth() { + return auditRecoveryDepth; + } + + public void setAuditRecoveryDepth(int auditRecoveryDepth) { + this.auditRecoveryDepth = auditRecoveryDepth; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index a82be3f014..cbbe2acc8b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -152,7 +152,7 @@ public class Statements { // and work back for X if (findAllMessageIdsStatement == null) { findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName() - + " WHERE CONTAINER=? ORDER BY ID DESC"; + + " ORDER BY ID DESC"; } return findAllMessageIdsStatement; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 66449bcf3e..5b11b5d34a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -327,20 +327,16 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, + public void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); - s.setString(1, destination.getQualifiedName()); - // limit the query. just need the the last few messages that could be replayed - // on recovery. send or commit reply lost so it gets replayed. + s.setMaxRows(limit); rs = s.executeQuery(); while (rs.next()) { - if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) { - break; - } + listener.messageId(new MessageId(rs.getString(2), rs.getLong(3))); } } finally { close(rs);