git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@889167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-12-10 10:05:11 +00:00
parent e80bf00b5b
commit c3801c4bc4
5 changed files with 67 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -48,22 +49,24 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, Long>();
protected ActiveMQMessageAudit audit;
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
super(destination);
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
this.audit = audit;
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
MessageId messageId = message.getMessageId();
Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
if (lastAddedMessage != null && lastAddedMessage >= messageId.getProducerSequenceId()) {
if (audit != null && audit.isDuplicate(message)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Message " + message + " already added to the database. Skipping.");
LOG.debug(destination.getPhysicalName()
+ " ignoring duplicated (add) message, already stored: "
+ messageId);
}
return;
}
@ -81,7 +84,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
addedMessages.put(messageId.getProducerId(), messageId.getProducerSequenceId());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);

View File

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@ -85,6 +86,11 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
private boolean createTablesOnStartup = true;
private DataSource lockDataSource;
private int transactionIsolation;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
public JDBCPersistenceAdapter() {
}
@ -119,9 +125,16 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
private Set<ActiveMQDestination> emptyDestinationSet() {
return Collections.EMPTY_SET;
}
protected ActiveMQMessageAudit createMessageAudit() {
if (enableAudit && audit == null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
}
return audit;
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination);
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@ -129,7 +142,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination);
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@ -588,4 +601,30 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setTransactionIsolation(int transactionIsolation) {
this.transactionIsolation = transactionIsolation;
}
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
public int getMaxAuditDepth() {
return maxAuditDepth;
}
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
public boolean isEnableAudit() {
return enableAudit;
}
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@ -40,8 +41,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic) {
super(persistenceAdapter, adapter, wireFormat, topic);
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
super(persistenceAdapter, adapter, wireFormat, topic, audit);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {

View File

@ -32,7 +32,7 @@ import org.apache.activemq.command.MessageId;
*/
abstract public class PersistenceAdapterTestSupport extends TestCase {
private PersistenceAdapter pa;
protected PersistenceAdapter pa;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;

View File

@ -20,19 +20,10 @@ import java.io.IOException;
import junit.framework.AssertionFailedError;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport;
import org.apache.derby.jdbc.EmbeddedDataSource;
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
@ -47,4 +38,18 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
return jdbc;
}
public void testAuditOff() throws Exception {
((JDBCPersistenceAdapter)pa).setEnableAudit(false);
boolean failed = true;
try {
testStoreCanHandleDupMessages();
failed = false;
} catch (AssertionFailedError e) {
}
if (!failed) {
fail("Should have failed with audit turned off");
}
}
}