resolve https://issues.apache.org/activemq/browse/AMQ-2800, https://issues.apache.org/activemq/browse/AMQ-2542, https://issues.apache.org/activemq/browse/AMQ-2803 - implement duplicate checker in transport for a failover: reconnect, uses last seqid from store. iimplemented for kahaDB and JDBC

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@961783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-08 14:23:21 +00:00
parent 7a7a876045
commit a6a6a708ff
37 changed files with 1007 additions and 259 deletions

View File

@ -16,35 +16,23 @@
*/
package org.apache.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.util.BitArrayBin;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache;
/**
* Provides basic audit functions for Messages
*
* @version $Revision: 1.1.1.1 $
*/
public class ActiveMQMessageAudit {
public class ActiveMQMessageAudit extends ActiveMQMessageAuditNoSync {
public static final int DEFAULT_WINDOW_SIZE = 2048;
public static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
private static final long serialVersionUID = 1L;
/**
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
* 64
*/
public ActiveMQMessageAudit() {
this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
super();
}
/**
@ -55,198 +43,41 @@ public class ActiveMQMessageAudit {
* the system
*/
public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
this.auditDepth = auditDepth;
this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
super(auditDepth, maximumNumberOfProducersToTrack);
}
/**
* @return the auditDepth
*/
public int getAuditDepth() {
return auditDepth;
}
/**
* @param auditDepth the auditDepth to set
*/
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
}
/**
* @return the maximumNumberOfProducersToTrack
*/
public int getMaximumNumberOfProducersToTrack() {
return maximumNumberOfProducersToTrack;
}
/**
* @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
*/
public void setMaximumNumberOfProducersToTrack(
int maximumNumberOfProducersToTrack) {
this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
}
/**
* Checks if this message has been seen before
*
* @param message
* @return true if the message is a duplicate
* @throws JMSException
*/
public boolean isDuplicate(Message message) throws JMSException {
return isDuplicate(message.getJMSMessageID());
}
/**
* checks whether this messageId has been seen before and adds this
* messageId to the list
*
* @param id
* @return true if the message is a duplicate
*/
public synchronized boolean isDuplicate(String id) {
boolean answer = false;
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(seed, bab);
}
long index = IdGenerator.getSequenceFromId(id);
if (index >= 0) {
answer = bab.setBit(index, true);
}
@Override
public boolean isDuplicate(String id) {
synchronized (this) {
return super.isDuplicate(id);
}
return answer;
}
/**
* Checks if this message has been seen before
*
* @param message
* @return true if the message is a duplicate
*/
public boolean isDuplicate(final MessageReference message) {
MessageId id = message.getMessageId();
return isDuplicate(id);
}
/**
* Checks if this messageId has been seen before
*
* @param id
* @return true if the message is a duplicate
*/
public synchronized boolean isDuplicate(final MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.setBit(id.getProducerSequenceId(), true);
}
@Override
public boolean isDuplicate(final MessageId id) {
synchronized (this) {
return super.isDuplicate(id);
}
return answer;
}
/**
* mark this message as being received
*
* @param message
*/
public void rollback(final MessageReference message) {
MessageId id = message.getMessageId();
rollback(id);
}
/**
* mark this message as being received
*
* @param id
*/
public synchronized void rollback(final MessageId id) {
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab != null) {
bab.setBit(id.getProducerSequenceId(), false);
}
}
@Override
public void rollback(final MessageId id) {
synchronized (this) {
super.rollback(id);
}
}
/**
* Check the message is in order
* @param msg
* @return
* @throws JMSException
*/
public boolean isInOrder(Message msg) throws JMSException {
return isInOrder(msg.getJMSMessageID());
}
/**
* Check the message id is in order
* @param id
* @return
*/
public synchronized boolean isInOrder(final String id) {
boolean answer = true;
if (id != null) {
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab != null) {
long index = IdGenerator.getSequenceFromId(id);
answer = bab.isInOrder(index);
}
}
@Override
public boolean isInOrder(final String id) {
synchronized (this) {
return super.isInOrder(id);
}
return answer;
}
/**
* Check the MessageId is in order
* @param message
* @return
*/
public synchronized boolean isInOrder(final MessageReference message) {
return isInOrder(message.getMessageId());
}
/**
* Check the MessageId is in order
* @param id
* @return
*/
public synchronized boolean isInOrder(final MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.isInOrder(id.getProducerSequenceId());
}
@Override
public boolean isInOrder(final MessageId id) {
synchronized (this) {
return isInOrder(id);
}
return answer;
}
}

View File

@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.util.BitArrayBin;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache;
/**
* Provides basic audit functions for Messages without sync
*
* @version $Revision$
*/
public class ActiveMQMessageAuditNoSync implements Serializable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_WINDOW_SIZE = 2048;
public static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
/**
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
* 64
*/
public ActiveMQMessageAuditNoSync() {
this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
}
/**
* Construct a MessageAudit
*
* @param auditDepth range of ids to track
* @param maximumNumberOfProducersToTrack number of producers expected in
* the system
*/
public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
this.auditDepth = auditDepth;
this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
}
/**
* @return the auditDepth
*/
public int getAuditDepth() {
return auditDepth;
}
/**
* @param auditDepth the auditDepth to set
*/
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
}
/**
* @return the maximumNumberOfProducersToTrack
*/
public int getMaximumNumberOfProducersToTrack() {
return maximumNumberOfProducersToTrack;
}
/**
* @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
*/
public void setMaximumNumberOfProducersToTrack(
int maximumNumberOfProducersToTrack) {
this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
}
/**
* Checks if this message has been seen before
*
* @param message
* @return true if the message is a duplicate
* @throws JMSException
*/
public boolean isDuplicate(Message message) throws JMSException {
return isDuplicate(message.getJMSMessageID());
}
/**
* checks whether this messageId has been seen before and adds this
* messageId to the list
*
* @param id
* @return true if the message is a duplicate
*/
public boolean isDuplicate(String id) {
boolean answer = false;
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(seed, bab);
}
long index = IdGenerator.getSequenceFromId(id);
if (index >= 0) {
answer = bab.setBit(index, true);
}
}
return answer;
}
/**
* Checks if this message has been seen before
*
* @param message
* @return true if the message is a duplicate
*/
public boolean isDuplicate(final MessageReference message) {
MessageId id = message.getMessageId();
return isDuplicate(id);
}
/**
* Checks if this messageId has been seen before
*
* @param id
* @return true if the message is a duplicate
*/
public boolean isDuplicate(final MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.setBit(id.getProducerSequenceId(), true);
}
}
return answer;
}
/**
* mark this message as being received
*
* @param message
*/
public void rollback(final MessageReference message) {
MessageId id = message.getMessageId();
rollback(id);
}
/**
* mark this message as being received
*
* @param id
*/
public void rollback(final MessageId id) {
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab != null) {
bab.setBit(id.getProducerSequenceId(), false);
}
}
}
}
/**
* Check the message is in order
* @param msg
* @return
* @throws JMSException
*/
public boolean isInOrder(Message msg) throws JMSException {
return isInOrder(msg.getJMSMessageID());
}
/**
* Check the message id is in order
* @param id
* @return
*/
public boolean isInOrder(final String id) {
boolean answer = true;
if (id != null) {
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab != null) {
long index = IdGenerator.getSequenceFromId(id);
answer = bab.isInOrder(index);
}
}
}
return answer;
}
/**
* Check the MessageId is in order
* @param message
* @return
*/
public boolean isInOrder(final MessageReference message) {
return isInOrder(message.getMessageId());
}
/**
* Check the MessageId is in order
* @param id
* @return
*/
public boolean isInOrder(final MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.isInOrder(id.getProducerSequenceId());
}
}
return answer;
}
public long getLastSeqId(ProducerId id) {
long result = -1;
BitArrayBin bab = map.get(id.toString() + ":");
if (bab != null) {
result = bab.getLastSetIndex();
}
return result;
}
}

View File

@ -47,7 +47,7 @@ public class ConnectionContext {
private ConnectionId connectionId;
private String clientId;
private String userName;
private boolean haAware;
private boolean reconnect;
private WireFormatInfo wireFormatInfo;
private Object longTermStoreContext;
private boolean producerFlowControl = true;
@ -86,7 +86,7 @@ public class ConnectionContext {
rc.connectionId = this.connectionId;
rc.clientId = this.clientId;
rc.userName = this.userName;
rc.haAware = this.haAware;
rc.reconnect = this.reconnect;
rc.wireFormatInfo = this.wireFormatInfo;
rc.longTermStoreContext = this.longTermStoreContext;
rc.producerFlowControl = this.producerFlowControl;
@ -212,12 +212,12 @@ public class ConnectionContext {
this.clientId = clientId;
}
public boolean isHaAware() {
return haAware;
public boolean isReconnect() {
return reconnect;
}
public void setHaAware(boolean haAware) {
this.haAware = haAware;
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
public WireFormatInfo getWireFormatInfo() {

View File

@ -18,7 +18,10 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.Message;
import org.apache.activemq.state.ProducerState;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Holds internal state in the broker for a MessageProducer
@ -27,11 +30,13 @@ import org.apache.activemq.state.ProducerState;
*/
public class ProducerBrokerExchange {
private static final Log LOG = LogFactory.getLog(ProducerBrokerExchange.class);
private ConnectionContext connectionContext;
private Destination regionDestination;
private Region region;
private ProducerState producerState;
private boolean mutable = true;
private long lastSendSequenceNumber = -1;
public ProducerBrokerExchange() {
}
@ -117,4 +122,25 @@ public class ProducerBrokerExchange {
this.producerState = producerState;
}
/**
* Enforce duplicate suppression using info from persistence adapter
* @param messageSend
* @return false if message should be ignored as a duplicate
*/
public boolean canDispatch(Message messageSend) {
boolean canDispatch = true;
if (lastSendSequenceNumber > 0) {
if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
canDispatch = false;
LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
+ messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber);
}
}
return canDispatch;
}
public void setLastStoredSequenceId(long l) {
lastSendSequenceNumber = l;
LOG.debug("last stored sequence id set: " + l);
}
}

View File

@ -453,7 +453,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
broker.send(producerExchange, messageSend);
if (producerExchange.canDispatch(messageSend)) {
broker.send(producerExchange, messageSend);
}
return null;
}
@ -680,6 +682,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
context.setReconnect(info.isFailoverReconnect());
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
@ -1249,13 +1252,16 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {
synchronized (producerExchanges) {
result = new ProducerBrokerExchange();
TransportConnectionState state = lookupConnectionState(id);
TransportConnectionState state = lookupConnectionState(id);
context = state.getContext();
if (context.isReconnect()) {
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
}
result.setConnectionContext(context);
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) {

View File

@ -374,6 +374,7 @@ public abstract class AbstractRegion implements Region {
LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
} else {
LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
return;
}
}

View File

@ -37,6 +37,7 @@ public class ConnectionInfo extends BaseCommand {
protected boolean clientMaster = true;
protected boolean faultTolerant = false;
protected transient Object transportContext;
private boolean failoverReconnect;
public ConnectionInfo() {
}
@ -216,4 +217,15 @@ public class ConnectionInfo extends BaseCommand {
this.faultTolerant = faultTolerant;
}
/**
* @openwire:property version=6 cache=false
* @return failoverReconnect true if this is a reconnect
*/
public boolean isFailoverReconnect() {
return this.failoverReconnect;
}
public void setFailoverReconnect(boolean failoverReconnect) {
this.failoverReconnect = failoverReconnect;
}
}

View File

@ -86,6 +86,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
info.setManageable(bs.readBoolean());
info.setClientMaster(bs.readBoolean());
info.setFaultTolerant(bs.readBoolean());
info.setFailoverReconnect(bs.readBoolean());
}
@ -107,6 +108,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
bs.writeBoolean(info.isManageable());
bs.writeBoolean(info.isClientMaster());
bs.writeBoolean(info.isFaultTolerant());
bs.writeBoolean(info.isFailoverReconnect());
return rc + 0;
}
@ -131,6 +133,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
}
@ -165,6 +168,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
info.setManageable(dataIn.readBoolean());
info.setClientMaster(dataIn.readBoolean());
info.setFaultTolerant(dataIn.readBoolean());
info.setFailoverReconnect(dataIn.readBoolean());
}
@ -186,6 +190,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller {
dataOut.writeBoolean(info.isManageable());
dataOut.writeBoolean(info.isClientMaster());
dataOut.writeBoolean(info.isFaultTolerant());
dataOut.writeBoolean(info.isFailoverReconnect());
}
}

View File

@ -140,6 +140,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
// Restore the connections.
for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
ConnectionState connectionState = iter.next();
connectionState.getInfo().setFailoverReconnect(true);
if (LOG.isDebugEnabled()) {
LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
}
@ -156,6 +157,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
//now flush messages
for (Message msg:messageCache.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("message: " + msg.getMessageId());
}
transport.oneway(msg);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.usage.SystemUsage;
/**
@ -157,4 +158,13 @@ public interface PersistenceAdapter extends Service {
* @return disk space used in bytes of 0 if not implemented
*/
long size();
/**
* return the last stored producer sequenceId for this producer Id
* used to suppress duplicate sends on failover reconnect at the transport
* when a reconnect occurs
* @param id the producerId to find a sequenceId for
* @return the last stored sequence id or -1 if no suppression needed
*/
long getLastProducerSequenceId(ProducerId id) throws IOException;
}

View File

@ -43,6 +43,7 @@ import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
@ -1117,4 +1118,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
+ ".DisableLocking",
"false"));
}
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;
}
}

View File

@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
/**
@ -60,7 +61,7 @@ public interface JDBCAdapter {
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
@ -85,4 +86,6 @@ public interface JDBCAdapter {
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
}

View File

@ -285,7 +285,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
long result = -1;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, messageId);
result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

View File

@ -37,6 +37,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -93,7 +94,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
protected boolean enableAudit=false;
protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit;
@ -245,6 +246,19 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
c.close();
}
}
public long getLastProducerSequenceId(ProducerId id) throws IOException {
TransactionContext c = getTransactionContext();
try {
return getAdapter().doGetLastProducerSequenceId(c, id);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
} finally {
c.close();
}
}
public void start() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
@ -699,6 +713,5 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
}
}
}

View File

@ -49,7 +49,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
long seq = adapter.getStoreSequenceId(c, messageId);
long seq = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);

View File

@ -67,6 +67,7 @@ public class Statements {
private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
private String findAllMessageIdsStatement;
private String lastProducerSequenceIdStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@ -128,7 +129,7 @@ public class Statements {
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
+ " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
+ " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
}
return findMessageSequenceIdStatement;
}
@ -172,6 +173,15 @@ public class Statements {
return findLastSequenceIdInMsgsStatement;
}
public String getLastProducerSequenceIdStatement() {
if (lastProducerSequenceIdStatement == null) {
lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName()
+ " WHERE MSGID_PROD=?";
}
return lastProducerSequenceIdStatement;
}
public String getFindLastSequenceIdInAcksStatement() {
if (findLastSequenceIdInAcksStatement == null) {
findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName();
@ -656,4 +666,9 @@ public class Statements {
this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement;
}
public void setLastProducerSequenceIdStatement(String lastProducerSequenceIdStatement) {
this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
}
}

View File

@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
@ -246,13 +247,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
s.setLong(2, messageID.getProducerSequenceId());
s.setString(3, destination.getQualifiedName());
rs = s.executeQuery();
if (!rs.next()) {
return 0;
@ -819,4 +821,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
* try { s.close(); } catch (Throwable ignore) {} } }
*/
public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
s.setString(1, id.toString());
rs = s.executeQuery();
long seq = -1;
if (rs.next()) {
seq = rs.getLong(1);
}
return seq;
} finally {
close(rs);
close(s);
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
@ -745,4 +746,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
}
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@ -369,6 +370,11 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
@ -124,6 +125,10 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
return this.letter.getLastMessageBrokerSequenceId();
}
public long getLastProducerSequenceId(ProducerId id) throws IOException {
return this.letter.getLastProducerSequenceId(id);
}
/**
* @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
@ -208,6 +213,29 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
this.letter.setJournalMaxFileLength(journalMaxFileLength);
}
/**
* Set the max number of producers (LRU cache) to track for duplicate sends
*/
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
}
public int getMaxFailoverProducersToTrack() {
return this.letter.getMaxFailoverProducersToTrack();
}
/**
* set the audit window depth for duplicate suppression (should exceed the max transaction
* batch)
*/
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
}
public int getFailoverProducersAuditDepth() {
return this.getFailoverProducersAuditDepth();
}
/**
* Get the checkpointInterval
*
@ -477,4 +505,5 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path + "]";
}
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@ -363,6 +364,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@ -901,6 +903,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock();
try {
return metadata.producerSequenceIdTracker.getLastSeqId(id);
} finally {
indexLock.readLock().unlock();
}
}
public long size() {
if (!isStarted()) {

View File

@ -16,11 +16,15 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
@ -35,22 +39,25 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@ -93,9 +100,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
protected class Metadata {
@ -104,6 +111,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected BTreeIndex<String, StoredDestination> destinations;
protected Location lastUpdate;
protected Location firstInProgressTransactionLocation;
protected Location producerSequenceIdTrackerLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
public void read(DataInput is) throws IOException {
state = is.readInt();
@ -118,6 +127,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} else {
firstInProgressTransactionLocation = null;
}
try {
if (is.readBoolean()) {
producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
} else {
producerSequenceIdTrackerLocation = null;
}
} catch (EOFException expectedOnUpgrade) {
}
}
public void write(DataOutput os) throws IOException {
@ -137,6 +154,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} else {
os.writeBoolean(false);
}
if (producerSequenceIdTrackerLocation != null) {
os.writeBoolean(true);
LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
} else {
os.writeBoolean(false);
}
}
}
@ -154,7 +178,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected PageFile pageFile;
protected Journal journal;
protected Metadata metadata = new Metadata();
protected Metadata metadata = new Metadata();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@ -171,7 +195,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
@ -381,15 +406,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
l = inflightTransactions.values().iterator().next().get(0).getLocation();
}
if (!preparedTransactions.isEmpty()) {
Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
if (l==null || t.compareTo(l) <= 0) {
l = t;
if (!inflightTransactions.isEmpty()) {
l = inflightTransactions.values().iterator().next().get(0).getLocation();
}
if (!preparedTransactions.isEmpty()) {
Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
if (l==null || t.compareTo(l) <= 0) {
l = t;
}
}
}
}
return l;
}
@ -407,21 +432,25 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
try {
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
if( recoveryPosition!=null ) {
int redoCounter = 0;
LOG.info("Recoverying from the journal ...");
while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition);
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
long end = System.currentTimeMillis();
LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
Location producerAuditPosition = recoverProducerAudit();
Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
int redoCounter = 0;
LOG.info("Recoverying from the journal ...");
while (recoveryPosition != null) {
JournalCommand<?> message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition, lastIndoubtPosition);
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
long end = System.currentTimeMillis();
LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
// We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@ -433,7 +462,39 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
protected void recoverIndex(Transaction tx) throws IOException {
private Location minimum(Location producerAuditPosition,
Location lastIndoubtPosition) {
Location min = null;
if (producerAuditPosition != null) {
min = producerAuditPosition;
if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
min = lastIndoubtPosition;
}
} else {
min = lastIndoubtPosition;
}
return min;
}
private Location recoverProducerAudit() throws IOException {
if (metadata.producerSequenceIdTrackerLocation != null) {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
} catch (ClassNotFoundException cfe) {
IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
ioe.initCause(cfe);
throw ioe;
}
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
} else {
// got no audit stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
}
}
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
@ -457,6 +518,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
@ -588,7 +650,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
while (nextRecoveryPosition != null) {
lastRecoveryPosition = nextRecoveryPosition;
metadata.lastUpdate = lastRecoveryPosition;
JournalCommand message = load(lastRecoveryPosition);
JournalCommand<?> message = load(lastRecoveryPosition);
process(message, lastRecoveryPosition);
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
@ -601,8 +663,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
return metadata.lastUpdate;
}
private Location getRecoveryPosition() throws IOException {
private Location getRecoveryPosition() throws IOException {
// If we need to recover the transactions..
if (metadata.firstInProgressTransactionLocation != null) {
return metadata.firstInProgressTransactionLocation;
@ -613,7 +675,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate);
}
// This loads the first position.
return journal.getNextLocation(null);
}
@ -658,7 +720,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
public Location store(JournalCommand<?> data) throws IOException {
return store(data, false, null,null);
}
@ -669,7 +731,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* during a recovery process.
* @param done
*/
public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
if (before != null) {
before.run();
}
@ -716,7 +778,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* @return
* @throws IOException
*/
public JournalCommand load(Location location) throws IOException {
public JournalCommand<?> load(Location location) throws IOException {
ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
@ -724,10 +786,30 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if( type == null ) {
throw new IOException("Could not load journal record. Invalid location: "+location);
}
JournalCommand message = (JournalCommand)type.createMessage();
JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
message.mergeFramed(is);
return message;
}
/**
* do minimal recovery till we reach the last inDoubtLocation
* @param data
* @param location
* @param inDoubtlocation
* @throws IOException
*/
void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
process(data, location);
} else {
// just recover producer audit
data.visit(new Visitor() {
public void visit(KahaAddMessageCommand command) throws IOException {
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
}
});
}
}
// /////////////////////////////////////////////////////////////////
// Journaled record processing methods. Once the record is journaled,
@ -735,7 +817,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// from the recovery method too so they need to be idempotent
// /////////////////////////////////////////////////////////////////
void process(JournalCommand data, final Location location) throws IOException {
void process(JournalCommand<?> data, final Location location) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws IOException {
@ -911,7 +993,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if( previous == null ) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if( previous == null ) {
sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
} else {
// If the message ID as indexed, then the broker asked us to store a DUP
// message. Bad BOY! Don't do it, and log a warning.
@ -927,7 +1009,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous);
}
// record this id in any event, initial send or recovery
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
}
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@ -1025,6 +1108,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
LOG.debug("Checkpoint started.");
metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush();
@ -1111,6 +1195,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
LOG.debug("Checkpoint done.");
}
private Location checkpointProducerAudit() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(metadata.producerSequenceIdTracker);
oout.flush();
oout.close();
return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())));
}
public HashSet<Integer> getJournalFilesBeingReplicated() {
return journalFilesBeingReplicated;
}
@ -1580,6 +1673,22 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
return journalMaxFileLength;
}
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
}
public int getMaxFailoverProducersToTrack() {
return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
}
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
}
public int getFailoverProducersAuditDepth() {
return this.metadata.producerSequenceIdTracker.getAuditDepth();
}
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@ -569,5 +570,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
throw new IllegalArgumentException("Not in the valid destination format");
}
}
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@ -52,5 +53,8 @@ public class Visitor {
public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
}
public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
}
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
@ -201,4 +202,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
createTransactionStore();
}
}
public long getLastProducerSequenceId(ProducerId id) {
// memory map does duplicate suppression
return -1;
}
}

View File

@ -727,7 +727,7 @@ public class FailoverTransport implements CompositeTransport {
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
Command command = iter2.next();
if (LOG.isTraceEnabled()) {
LOG.trace("restore, replay: " + command);
LOG.trace("restore requestMap, replay: " + command);
}
t.oneway(command);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
/**
* Simple BitArray to enable setting multiple boolean values efficently Used
@ -27,7 +28,10 @@ import java.io.IOException;
*
* @version $Revision: 1.1.1.1 $
*/
public class BitArray {
public class BitArray implements Serializable {
private static final long serialVersionUID = 1L;
static final int LONG_SIZE = 64;
static final int INT_SIZE = 32;
static final int SHORT_SIZE = 16;
@ -113,6 +117,14 @@ public class BitArray {
this.bits = bits;
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
writeToStream(out);
}
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
readFromStream(in);
}
/**
* write the bits to an output stream
*

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.util;
import java.io.Serializable;
import java.util.LinkedList;
/**
@ -23,8 +24,9 @@ import java.util.LinkedList;
*
* @version $Revision: 1.1.1.1 $
*/
public class BitArrayBin {
public class BitArrayBin implements Serializable {
private static final long serialVersionUID = 1L;
private LinkedList<BitArray> list;
private int maxNumberOfArrays;
private int firstIndex = -1;
@ -162,4 +164,22 @@ public class BitArrayBin {
}
return answer;
}
public long getLastSetIndex() {
long result = -1;
if (firstIndex >=0) {
result = firstIndex;
BitArray last = null;
for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
last = list.get(lastBitArrayIndex);
if (last != null) {
result += last.length() -1;
result += lastBitArrayIndex * BitArray.LONG_SIZE;
break;
}
}
}
return result;
}
}

View File

@ -29,6 +29,7 @@ enum KahaEntryType {
KAHA_ROLLBACK_COMMAND = 5;
KAHA_REMOVE_DESTINATION_COMMAND = 6;
KAHA_SUBSCRIPTION_COMMAND = 7;
KAHA_PRODUCER_AUDIT_COMMAND = 8;
}
message KahaTraceCommand {
@ -109,6 +110,18 @@ message KahaSubscriptionCommand {
optional bytes subscriptionInfo = 4;
}
message KahaProducerAuditCommand {
// We make use of the wonky comment style bellow because the following options
// are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
// In the ActiveMQ proto compiler, comments terminate with the pipe character: |
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required bytes audit = 1;
}
message KahaDestination {
enum DestinationType {
QUEUE = 0;

View File

@ -152,6 +152,7 @@ public class PerDestinationStoreLimitTest extends JmsTestSupport {
Thread.sleep(1000);
// the producer is blocked once the done flag stays true
if (done.get()) {
LOG.info("Blocked....");
break;
}
done.set(true);

View File

@ -66,5 +66,6 @@ public class ConnectionInfoTest extends BaseCommandTestSupport {
info.setManageable(false);
info.setClientMaster(true);
info.setFaultTolerant(false);
info.setFailoverReconnect(true);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -119,6 +120,23 @@ public abstract class StoreOrderTest {
}
}
@Test
public void testCompositeSendReceiveAfterRestart() throws Exception {
destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
enqueueOneMessage();
LOG.info("restart broker");
stopBroker();
broker = createRestartedBroker();
dumpMessages();
initConnection();
destination = new ActiveMQQueue("StoreOrderTest");
assertNotNull("got one message from first dest", receiveOne());
dumpMessages();
destination = new ActiveMQQueue("SecondStoreOrderTest");
assertNotNull("got one message from second dest", receiveOne());
}
@Test
public void validateUnorderedTxCommit() throws Exception {
@ -247,6 +265,7 @@ public abstract class StoreOrderTest {
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setMemoryLimit(1024*3);
defaultEntry.setCursorMemoryHighWaterMark(68);
defaultEntry.setExpireMessagesPeriod(0);
map.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(map);
}

View File

@ -28,6 +28,11 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
// explicitly enable audit as it is now off by default
// due to org.apache.activemq.broker.ProducerBrokerExchange.canDispatch(Message)
jdbc.setEnableAudit(true);
brokerService.setSchedulerSupport(false);
brokerService.setPersistenceAdapter(jdbc);
jdbc.setBrokerService(brokerService);
@ -56,6 +61,5 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
if (!failed) {
fail("Should have failed with audit turned off");
}
}
}
}

View File

@ -43,7 +43,7 @@ public class JDBCStoreOrderTest extends StoreOrderTest {
while(result.next()) {
long id = result.getLong(1);
Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
}
statement.close();
conn.close();

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -49,10 +50,14 @@ import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SocketProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
@ -86,6 +91,7 @@ public class FailoverTransactionTest {
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
broker.addConnector(url);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
return broker;
@ -114,7 +120,7 @@ public class FailoverTransactionTest {
}
@Test
public void testFailoverCommitReplyLost() throws Exception {
public void testFailoverCommitReplyLostAMQ() throws Exception {
doTestFailoverCommitReplyLost(0);
}
@ -222,15 +228,257 @@ public class FailoverTransactionTest {
connection.close();
}
//@Test not implemented
public void testFailoverSendReplyLostAMQ() throws Exception {
doTestFailoverSendReplyLost(0);
}
@Test
public void testFailoverSendReplyLostJdbc() throws Exception {
doTestFailoverSendReplyLost(1);
}
@Test
public void testFailoverSendReplyLostKahaDB() throws Exception {
doTestFailoverSendReplyLost(2);
}
public void doTestFailoverSendReplyLost(final int adapter) throws Exception {
broker = createBroker(true);
setPersistenceAdapter(adapter);
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
@Override
public void send(ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
// so send will hang as if reply is lost
super.send(producerExchange, messageSend);
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post send...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// broker will die on send reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async send...");
try {
produceMessage(session, destination);
} catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.error("got send exception: ", e);
fail("got unexpected send exception" + e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false);
setPersistenceAdapter(adapter);
LOG.info("restarting....");
broker.start();
assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
// verify stats
assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with second restart..");
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
// not implemented.. @Test
public void testFailoverConnectionSendReplyLostAMQ() throws Exception {
doTestFailoverConnectionSendReplyLost(0);
}
@Test
public void testFailoverConnectionSendReplyLostJdbc() throws Exception {
doTestFailoverConnectionSendReplyLost(1);
}
@Test
public void testFailoverConnectionSendReplyLostKahaDB() throws Exception {
doTestFailoverConnectionSendReplyLost(2);
}
public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception {
broker = createBroker(true);
setPersistenceAdapter(adapter);
final SocketProxy proxy = new SocketProxy();
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
private boolean firstSend = true;
@Override
public void send(ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
// so send will hang as if reply is lost
super.send(producerExchange, messageSend);
if (firstSend) {
firstSend = false;
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping connection post send...");
try {
proxy.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
broker.start();
proxy.setTarget(new URI(url));
proxy.open();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// proxy connection will die on send reply so this will hang on failover reconnect till open
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async send...");
try {
produceMessage(session, destination);
} catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got send exception: ", e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
});
// will be closed by the plugin
assertTrue("proxy was closed", proxy.waitUntilClosed(30));
LOG.info("restarting proxy");
proxy.open();
assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
// verify stats, connection dup suppression means dups don't get to broker
assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with restart..");
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
private void setPersistenceAdapter(int adapter) throws IOException {
switch (adapter) {
case 0:
broker.setPersistenceAdapter(new AMQPersistenceAdapter());
break;
case 1:
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
break;
case 2:
KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
// duplicate checker not updated on canceled tasks, even it
// it was, reovery of the audit would fail as the message is
// not recorded in the store and the audit may not be up to date.
// So if duplicate are a nono (w.r.t stats), this must be disabled
store.setConcurrentStoreAndDispatchQueues(false);
store.setMaxFailoverProducersToTrack(10);
store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
broker.setPersistenceAdapter(store);
break;

View File

@ -32,21 +32,29 @@ public class BitArrayBinTest extends TestCase {
for (int i=0; i <= dataSize; i++) {
assertTrue("not already set", !toTest.setBit(i, Boolean.TRUE));
assertEquals("current is max", i, toTest.getLastSetIndex());
}
assertEquals("last is max", dataSize, toTest.getLastSetIndex());
int windowOfValidData = roundWindow(dataSize, window);
int i=dataSize;
for (; i >= dataSize -windowOfValidData; i--) {
assertTrue("was already set, id=" + i, toTest.setBit(i, Boolean.TRUE));
}
assertEquals("last is still max", dataSize, toTest.getLastSetIndex());
for (; i >= 0; i--) {
assertTrue("was not already set, id=" + i, !toTest.setBit(i, Boolean.TRUE));
}
for (int j= dataSize +1; j<(2*dataSize); j++) {
for (int j= dataSize +1; j<=(2*dataSize); j++) {
assertTrue("not already set: id=" + j, !toTest.setBit(j, Boolean.TRUE));
}
assertEquals("last still max*2", 2*dataSize, toTest.getLastSetIndex());
}
public void testSetUnsetAroundWindow() throws Exception {
@ -87,6 +95,7 @@ public class BitArrayBinTest extends TestCase {
int instance = value +muliplier*BitArray.LONG_SIZE;
assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE));
assertTrue("not already set: id=" + value, !toTest.setBit(value, Boolean.TRUE));
assertEquals("max set correct", instance, toTest.getLastSetIndex());
}
}
}
@ -109,6 +118,22 @@ public class BitArrayBinTest extends TestCase {
assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE));
}
public void testLastSeq() throws Exception {
BitArrayBin toTest = new BitArrayBin(512);
assertEquals("last not set", -1, toTest.getLastSetIndex());
toTest.setBit(1, Boolean.TRUE);
assertEquals("last correct", 1, toTest.getLastSetIndex());
toTest.setBit(64, Boolean.TRUE);
assertEquals("last correct", 64, toTest.getLastSetIndex());
toTest.setBit(68, Boolean.TRUE);
assertEquals("last correct", 68, toTest.getLastSetIndex());
}
// window moves in increments of BitArray.LONG_SIZE.
// valid data window on low end can be larger than window
private int roundWindow(int dataSetEnd, int windowSize) {

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@ -46,6 +47,8 @@ public class SocketProxy {
private Acceptor acceptor;
private ServerSocket serverSocket;
private CountDownLatch closed = new CountDownLatch(1);
public List<Connection> connections = new LinkedList<Connection>();
@ -87,6 +90,7 @@ public class SocketProxy {
}
acceptor = new Acceptor(serverSocket, target);
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
closed = new CountDownLatch(1);
}
public URI getUrl() {
@ -106,6 +110,11 @@ public class SocketProxy {
closeConnection(con);
}
acceptor.close();
closed.countDown();
}
public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
return closed.await(timeoutSeconds, TimeUnit.SECONDS);
}
/*
@ -303,10 +312,12 @@ public class SocketProxy {
public void close() {
try {
socket.close();
closed.countDown();
goOn();
} catch (IOException ignored) {
}
}
}
}