git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@892759 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-12-21 10:53:44 +00:00
parent 5ffcaf4052
commit ae840c4b04
6 changed files with 42 additions and 37 deletions

View File

@ -82,5 +82,5 @@ public interface JDBCAdapter {
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
}

View File

@ -19,5 +19,5 @@ package org.apache.activemq.store.jdbc;
import org.apache.activemq.command.MessageId;
public interface JDBCMessageIdScanListener {
boolean messageId(MessageId id);
void messageId(MessageId id);
}

View File

@ -53,26 +53,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
this.adapter = adapter;
this.wireFormat = wireFormat;
this.audit = audit;
initAudit();
}
/*
* revisit: This can be destination agnostic and back in the jdbc persistence adapter start
*/
public void initAudit() {
if (audit != null) {
try {
TransactionContext c = persistenceAdapter.getTransactionContext(null);
adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() {
public boolean messageId(MessageId id) {
audit.isDuplicate(id);
return true;
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for queue store " + destination);
}
}
}
public void addMessage(ConnectionContext context, Message message) throws IOException {

View File

@ -35,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -90,6 +91,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit;
public JDBCPersistenceAdapter() {
@ -126,15 +128,33 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
return Collections.EMPTY_SET;
}
protected ActiveMQMessageAudit createMessageAudit() {
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
protected void createMessageAudit() {
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
TransactionContext c = null;
try {
c = getTransactionContext();
getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
public void messageId(MessageId id) {
audit.isDuplicate(id);
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
} finally {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
}
}
}
}
return audit;
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@ -142,7 +162,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@ -234,6 +254,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
}, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
}
createMessageAudit();
}
public synchronized void stop() throws Exception {
@ -625,6 +647,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
public int getAuditRecoveryDepth() {
return auditRecoveryDepth;
}
public void setAuditRecoveryDepth(int auditRecoveryDepth) {
this.auditRecoveryDepth = auditRecoveryDepth;
}
}

View File

@ -152,7 +152,7 @@ public class Statements {
// and work back for X
if (findAllMessageIdsStatement == null) {
findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? ORDER BY ID DESC";
+ " ORDER BY ID DESC";
}
return findAllMessageIdsStatement;
}

View File

@ -327,20 +327,16 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit,
public void doMessageIdScan(TransactionContext c, int limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s.setString(1, destination.getQualifiedName());
// limit the query. just need the the last few messages that could be replayed
// on recovery. send or commit reply lost so it gets replayed.
s.setMaxRows(limit);
rs = s.executeQuery();
while (rs.next()) {
if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) {
break;
}
listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)));
}
} finally {
close(rs);