diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java index 44f9ac29ec..d6cf3f5fb3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java @@ -16,35 +16,23 @@ */ package org.apache.activemq; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.util.BitArrayBin; -import org.apache.activemq.util.IdGenerator; -import org.apache.activemq.util.LRUCache; /** * Provides basic audit functions for Messages * * @version $Revision: 1.1.1.1 $ */ -public class ActiveMQMessageAudit { +public class ActiveMQMessageAudit extends ActiveMQMessageAuditNoSync { - public static final int DEFAULT_WINDOW_SIZE = 2048; - public static final int MAXIMUM_PRODUCER_COUNT = 64; - private int auditDepth; - private int maximumNumberOfProducersToTrack; - private LRUCache map; + private static final long serialVersionUID = 1L; /** * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = * 64 */ public ActiveMQMessageAudit() { - this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); + super(); } /** @@ -55,198 +43,41 @@ public class ActiveMQMessageAudit { * the system */ public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) { - this.auditDepth = auditDepth; - this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; - this.map = new LRUCache(0, maximumNumberOfProducersToTrack, 0.75f, true); + super(auditDepth, maximumNumberOfProducersToTrack); } - /** - * @return the auditDepth - */ - public int getAuditDepth() { - return auditDepth; - } - - /** - * @param auditDepth the auditDepth to set - */ - public void setAuditDepth(int auditDepth) { - this.auditDepth = auditDepth; - } - - /** - * @return the maximumNumberOfProducersToTrack - */ - public int getMaximumNumberOfProducersToTrack() { - return maximumNumberOfProducersToTrack; - } - - /** - * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set - */ - public void setMaximumNumberOfProducersToTrack( - int maximumNumberOfProducersToTrack) { - this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; - this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); - } - - /** - * Checks if this message has been seen before - * - * @param message - * @return true if the message is a duplicate - * @throws JMSException - */ - public boolean isDuplicate(Message message) throws JMSException { - return isDuplicate(message.getJMSMessageID()); - } - - /** - * checks whether this messageId has been seen before and adds this - * messageId to the list - * - * @param id - * @return true if the message is a duplicate - */ - public synchronized boolean isDuplicate(String id) { - boolean answer = false; - String seed = IdGenerator.getSeedFromId(id); - if (seed != null) { - BitArrayBin bab = map.get(seed); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(seed, bab); - } - long index = IdGenerator.getSequenceFromId(id); - if (index >= 0) { - answer = bab.setBit(index, true); - } + @Override + public boolean isDuplicate(String id) { + synchronized (this) { + return super.isDuplicate(id); } - return answer; } - /** - * Checks if this message has been seen before - * - * @param message - * @return true if the message is a duplicate - */ - public boolean isDuplicate(final MessageReference message) { - MessageId id = message.getMessageId(); - return isDuplicate(id); - } - - /** - * Checks if this messageId has been seen before - * - * @param id - * @return true if the message is a duplicate - */ - public synchronized boolean isDuplicate(final MessageId id) { - boolean answer = false; - - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(pid, bab); - } - answer = bab.setBit(id.getProducerSequenceId(), true); - } + @Override + public boolean isDuplicate(final MessageId id) { + synchronized (this) { + return super.isDuplicate(id); } - return answer; } - /** - * mark this message as being received - * - * @param message - */ - public void rollback(final MessageReference message) { - MessageId id = message.getMessageId(); - rollback(id); - } - - /** - * mark this message as being received - * - * @param id - */ - public synchronized void rollback(final MessageId id) { - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab != null) { - bab.setBit(id.getProducerSequenceId(), false); - } - } + @Override + public void rollback(final MessageId id) { + synchronized (this) { + super.rollback(id); } } - /** - * Check the message is in order - * @param msg - * @return - * @throws JMSException - */ - public boolean isInOrder(Message msg) throws JMSException { - return isInOrder(msg.getJMSMessageID()); - } - - /** - * Check the message id is in order - * @param id - * @return - */ - public synchronized boolean isInOrder(final String id) { - boolean answer = true; - - if (id != null) { - String seed = IdGenerator.getSeedFromId(id); - if (seed != null) { - BitArrayBin bab = map.get(seed); - if (bab != null) { - long index = IdGenerator.getSequenceFromId(id); - answer = bab.isInOrder(index); - } - - } + @Override + public boolean isInOrder(final String id) { + synchronized (this) { + return super.isInOrder(id); } - return answer; } - /** - * Check the MessageId is in order - * @param message - * @return - */ - public synchronized boolean isInOrder(final MessageReference message) { - return isInOrder(message.getMessageId()); - } - - /** - * Check the MessageId is in order - * @param id - * @return - */ - public synchronized boolean isInOrder(final MessageId id) { - boolean answer = false; - - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(pid, bab); - } - answer = bab.isInOrder(id.getProducerSequenceId()); - - } + @Override + public boolean isInOrder(final MessageId id) { + synchronized (this) { + return isInOrder(id); } - return answer; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java new file mode 100755 index 0000000000..e4fa093f10 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.util.BitArrayBin; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LRUCache; + +/** + * Provides basic audit functions for Messages without sync + * + * @version $Revision$ + */ +public class ActiveMQMessageAuditNoSync implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final int DEFAULT_WINDOW_SIZE = 2048; + public static final int MAXIMUM_PRODUCER_COUNT = 64; + private int auditDepth; + private int maximumNumberOfProducersToTrack; + private LRUCache map; + + /** + * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = + * 64 + */ + public ActiveMQMessageAuditNoSync() { + this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); + } + + /** + * Construct a MessageAudit + * + * @param auditDepth range of ids to track + * @param maximumNumberOfProducersToTrack number of producers expected in + * the system + */ + public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { + this.auditDepth = auditDepth; + this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; + this.map = new LRUCache(0, maximumNumberOfProducersToTrack, 0.75f, true); + } + + /** + * @return the auditDepth + */ + public int getAuditDepth() { + return auditDepth; + } + + /** + * @param auditDepth the auditDepth to set + */ + public void setAuditDepth(int auditDepth) { + this.auditDepth = auditDepth; + } + + /** + * @return the maximumNumberOfProducersToTrack + */ + public int getMaximumNumberOfProducersToTrack() { + return maximumNumberOfProducersToTrack; + } + + /** + * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set + */ + public void setMaximumNumberOfProducersToTrack( + int maximumNumberOfProducersToTrack) { + this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; + this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); + } + + /** + * Checks if this message has been seen before + * + * @param message + * @return true if the message is a duplicate + * @throws JMSException + */ + public boolean isDuplicate(Message message) throws JMSException { + return isDuplicate(message.getJMSMessageID()); + } + + /** + * checks whether this messageId has been seen before and adds this + * messageId to the list + * + * @param id + * @return true if the message is a duplicate + */ + public boolean isDuplicate(String id) { + boolean answer = false; + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(seed, bab); + } + long index = IdGenerator.getSequenceFromId(id); + if (index >= 0) { + answer = bab.setBit(index, true); + } + } + return answer; + } + + /** + * Checks if this message has been seen before + * + * @param message + * @return true if the message is a duplicate + */ + public boolean isDuplicate(final MessageReference message) { + MessageId id = message.getMessageId(); + return isDuplicate(id); + } + + /** + * Checks if this messageId has been seen before + * + * @param id + * @return true if the message is a duplicate + */ + public boolean isDuplicate(final MessageId id) { + boolean answer = false; + + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(pid, bab); + } + answer = bab.setBit(id.getProducerSequenceId(), true); + } + } + return answer; + } + + /** + * mark this message as being received + * + * @param message + */ + public void rollback(final MessageReference message) { + MessageId id = message.getMessageId(); + rollback(id); + } + + /** + * mark this message as being received + * + * @param id + */ + public void rollback(final MessageId id) { + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab != null) { + bab.setBit(id.getProducerSequenceId(), false); + } + } + } + } + + /** + * Check the message is in order + * @param msg + * @return + * @throws JMSException + */ + public boolean isInOrder(Message msg) throws JMSException { + return isInOrder(msg.getJMSMessageID()); + } + + /** + * Check the message id is in order + * @param id + * @return + */ + public boolean isInOrder(final String id) { + boolean answer = true; + + if (id != null) { + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab != null) { + long index = IdGenerator.getSequenceFromId(id); + answer = bab.isInOrder(index); + } + + } + } + return answer; + } + + /** + * Check the MessageId is in order + * @param message + * @return + */ + public boolean isInOrder(final MessageReference message) { + return isInOrder(message.getMessageId()); + } + + /** + * Check the MessageId is in order + * @param id + * @return + */ + public boolean isInOrder(final MessageId id) { + boolean answer = false; + + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(pid, bab); + } + answer = bab.isInOrder(id.getProducerSequenceId()); + + } + } + return answer; + } + + public long getLastSeqId(ProducerId id) { + long result = -1; + BitArrayBin bab = map.get(id.toString() + ":"); + if (bab != null) { + result = bab.getLastSetIndex(); + } + return result; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 3dc530efed..b3151b66fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -47,7 +47,7 @@ public class ConnectionContext { private ConnectionId connectionId; private String clientId; private String userName; - private boolean haAware; + private boolean reconnect; private WireFormatInfo wireFormatInfo; private Object longTermStoreContext; private boolean producerFlowControl = true; @@ -86,7 +86,7 @@ public class ConnectionContext { rc.connectionId = this.connectionId; rc.clientId = this.clientId; rc.userName = this.userName; - rc.haAware = this.haAware; + rc.reconnect = this.reconnect; rc.wireFormatInfo = this.wireFormatInfo; rc.longTermStoreContext = this.longTermStoreContext; rc.producerFlowControl = this.producerFlowControl; @@ -212,12 +212,12 @@ public class ConnectionContext { this.clientId = clientId; } - public boolean isHaAware() { - return haAware; + public boolean isReconnect() { + return reconnect; } - public void setHaAware(boolean haAware) { - this.haAware = haAware; + public void setReconnect(boolean reconnect) { + this.reconnect = reconnect; } public WireFormatInfo getWireFormatInfo() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index 39710dfcca..eff6bd225e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -18,7 +18,10 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Region; +import org.apache.activemq.command.Message; import org.apache.activemq.state.ProducerState; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Holds internal state in the broker for a MessageProducer @@ -27,11 +30,13 @@ import org.apache.activemq.state.ProducerState; */ public class ProducerBrokerExchange { + private static final Log LOG = LogFactory.getLog(ProducerBrokerExchange.class); private ConnectionContext connectionContext; private Destination regionDestination; private Region region; private ProducerState producerState; private boolean mutable = true; + private long lastSendSequenceNumber = -1; public ProducerBrokerExchange() { } @@ -117,4 +122,25 @@ public class ProducerBrokerExchange { this.producerState = producerState; } + /** + * Enforce duplicate suppression using info from persistence adapter + * @param messageSend + * @return false if message should be ignored as a duplicate + */ + public boolean canDispatch(Message messageSend) { + boolean canDispatch = true; + if (lastSendSequenceNumber > 0) { + if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) { + canDispatch = false; + LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" + + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); + } + } + return canDispatch; + } + + public void setLastStoredSequenceId(long l) { + lastSendSequenceNumber = l; + LOG.debug("last stored sequence id set: " + l); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index f479e391ac..5dea07635d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -453,7 +453,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public Response processMessage(Message messageSend) throws Exception { ProducerId producerId = messageSend.getProducerId(); ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); - broker.send(producerExchange, messageSend); + if (producerExchange.canDispatch(messageSend)) { + broker.send(producerExchange, messageSend); + } return null; } @@ -680,6 +682,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { context.setTransactions(new ConcurrentHashMap()); context.setUserName(info.getUserName()); context.setWireFormatInfo(wireFormatInfo); + context.setReconnect(info.isFailoverReconnect()); this.manageable = info.isManageable(); state.setContext(context); state.setConnection(this); @@ -1249,13 +1252,16 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } - private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) { + private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { ProducerBrokerExchange result = producerExchanges.get(id); if (result == null) { synchronized (producerExchanges) { result = new ProducerBrokerExchange(); - TransportConnectionState state = lookupConnectionState(id); + TransportConnectionState state = lookupConnectionState(id); context = state.getContext(); + if (context.isReconnect()) { + result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); + } result.setConnectionContext(context); SessionState ss = state.getSessionState(id.getParentId()); if (ss != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 2435a0f4a0..f03e50c577 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -374,6 +374,7 @@ public abstract class AbstractRegion implements Region { LOG.warn("Ack for non existent subscription, ack:" + ack); throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); } else { + LOG.debug("Ack for non existent subscription in recovery, ack:" + ack); return; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java index 73313ef84a..a152d6b51c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java @@ -37,6 +37,7 @@ public class ConnectionInfo extends BaseCommand { protected boolean clientMaster = true; protected boolean faultTolerant = false; protected transient Object transportContext; + private boolean failoverReconnect; public ConnectionInfo() { } @@ -216,4 +217,15 @@ public class ConnectionInfo extends BaseCommand { this.faultTolerant = faultTolerant; } + /** + * @openwire:property version=6 cache=false + * @return failoverReconnect true if this is a reconnect + */ + public boolean isFailoverReconnect() { + return this.failoverReconnect; + } + + public void setFailoverReconnect(boolean failoverReconnect) { + this.failoverReconnect = failoverReconnect; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java index df2727c771..b45b2f2123 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java @@ -86,6 +86,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { info.setManageable(bs.readBoolean()); info.setClientMaster(bs.readBoolean()); info.setFaultTolerant(bs.readBoolean()); + info.setFailoverReconnect(bs.readBoolean()); } @@ -107,6 +108,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { bs.writeBoolean(info.isManageable()); bs.writeBoolean(info.isClientMaster()); bs.writeBoolean(info.isFaultTolerant()); + bs.writeBoolean(info.isFailoverReconnect()); return rc + 0; } @@ -131,6 +133,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); + bs.readBoolean(); } @@ -165,6 +168,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { info.setManageable(dataIn.readBoolean()); info.setClientMaster(dataIn.readBoolean()); info.setFaultTolerant(dataIn.readBoolean()); + info.setFailoverReconnect(dataIn.readBoolean()); } @@ -186,6 +190,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { dataOut.writeBoolean(info.isManageable()); dataOut.writeBoolean(info.isClientMaster()); dataOut.writeBoolean(info.isFaultTolerant()); + dataOut.writeBoolean(info.isFailoverReconnect()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 5ff5af4e44..7a7bd49ed5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -140,6 +140,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the connections. for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { ConnectionState connectionState = iter.next(); + connectionState.getInfo().setFailoverReconnect(true); if (LOG.isDebugEnabled()) { LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); } @@ -156,6 +157,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } //now flush messages for (Message msg:messageCache.values()) { + if (LOG.isDebugEnabled()) { + LOG.debug("message: " + msg.getMessageId()); + } transport.oneway(msg); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index ef4ce54f51..d572039678 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.usage.SystemUsage; /** @@ -157,4 +158,13 @@ public interface PersistenceAdapter extends Service { * @return disk space used in bytes of 0 if not implemented */ long size(); + + /** + * return the last stored producer sequenceId for this producer Id + * used to suppress duplicate sends on failover reconnect at the transport + * when a reconnect occurs + * @param id the producerId to find a sequenceId for + * @return the last stored sequence id or -1 if no suppression needed + */ + long getLastProducerSequenceId(ProducerId id) throws IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 68254f903b..640c78f81f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -43,6 +43,7 @@ import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.impl.async.AsyncDataManager; @@ -1117,4 +1118,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, + ".DisableLocking", "false")); } + + + public long getLastProducerSequenceId(ProducerId id) { + // reference store send has adequate duplicate suppression + return -1; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index e883b08f85..cefebb0885 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; /** @@ -60,7 +61,7 @@ public interface JDBCAdapter { SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException; - long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; + long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException; void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException; @@ -85,4 +86,6 @@ public interface JDBCAdapter { long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; + + long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 7c38e773a6..e17d70d65b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -285,7 +285,7 @@ public class JDBCMessageStore extends AbstractMessageStore { long result = -1; TransactionContext c = persistenceAdapter.getTransactionContext(); try { - result = adapter.getStoreSequenceId(c, messageId); + result = adapter.getStoreSequenceId(c, destination, messageId); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 191abca581..ef73038b3b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -37,6 +37,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -93,7 +94,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected int maxProducersToAudit=1024; protected int maxAuditDepth=1000; - protected boolean enableAudit=true; + protected boolean enableAudit=false; protected int auditRecoveryDepth = 1024; protected ActiveMQMessageAudit audit; @@ -245,6 +246,19 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist c.close(); } } + + public long getLastProducerSequenceId(ProducerId id) throws IOException { + TransactionContext c = getTransactionContext(); + try { + return getAdapter().doGetLastProducerSequenceId(c, id); + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); + } finally { + c.close(); + } + } + public void start() throws Exception { getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); @@ -699,6 +713,5 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist synchronized(sequenceGenerator) { return sequenceGenerator.getNextSequenceId(); } - } - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index e6f6d70215..b474aef28b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -49,7 +49,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - long seq = adapter.getStoreSequenceId(c, messageId); + long seq = adapter.getStoreSequenceId(c, destination, messageId); adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index e61139f200..c500c44c48 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -67,6 +67,7 @@ public class Statements { private String findNextMessagesStatement; private boolean useLockCreateWhereClause; private String findAllMessageIdsStatement; + private String lastProducerSequenceIdStatement; public String[] getCreateSchemaStatements() { if (createSchemaStatements == null) { @@ -128,7 +129,7 @@ public class Statements { public String getFindMessageSequenceIdStatement() { if (findMessageSequenceIdStatement == null) { findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName() - + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; + + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; } return findMessageSequenceIdStatement; } @@ -172,6 +173,15 @@ public class Statements { return findLastSequenceIdInMsgsStatement; } + public String getLastProducerSequenceIdStatement() { + if (lastProducerSequenceIdStatement == null) { + lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName() + + " WHERE MSGID_PROD=?"; + } + return lastProducerSequenceIdStatement; + } + + public String getFindLastSequenceIdInAcksStatement() { if (findLastSequenceIdInAcksStatement == null) { findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName(); @@ -656,4 +666,9 @@ public class Statements { this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement; } + + public void setLastProducerSequenceIdStatement(String lastProducerSequenceIdStatement) { + this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement; + } + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index ea1128f475..51dd3086df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; @@ -246,13 +247,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { + public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); s.setString(1, messageID.getProducerId().toString()); s.setLong(2, messageID.getProducerSequenceId()); + s.setString(3, destination.getQualifiedName()); rs = s.executeQuery(); if (!rs.next()) { return 0; @@ -819,4 +821,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter { * try { s.close(); } catch (Throwable ignore) {} } } */ + public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) + throws SQLException, IOException { + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); + s.setString(1, id.toString()); + rs = s.executeQuery(); + long seq = -1; + if (rs.next()) { + seq = rs.getLong(1); + } + return seq; + } finally { + close(rs); + close(s); + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 77cafae1bb..c763ea7a57 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -50,6 +50,7 @@ import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; @@ -745,4 +746,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } + public long getLastProducerSequenceId(ProducerId id) { + return -1; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index f006010127..85b67ec381 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -32,6 +32,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; @@ -369,6 +370,11 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } + + public long getLastProducerSequenceId(ProducerId id) { + // reference store send has adequate duplicate suppression + return -1; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 554ea4adf3..c36d57b7b7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; @@ -124,6 +125,10 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi return this.letter.getLastMessageBrokerSequenceId(); } + public long getLastProducerSequenceId(ProducerId id) throws IOException { + return this.letter.getLastProducerSequenceId(id); + } + /** * @param destination * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) @@ -208,6 +213,29 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi this.letter.setJournalMaxFileLength(journalMaxFileLength); } + /** + * Set the max number of producers (LRU cache) to track for duplicate sends + */ + public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { + this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); + } + + public int getMaxFailoverProducersToTrack() { + return this.letter.getMaxFailoverProducersToTrack(); + } + + /** + * set the audit window depth for duplicate suppression (should exceed the max transaction + * batch) + */ + public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { + this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); + } + + public int getFailoverProducersAuditDepth() { + return this.getFailoverProducersAuditDepth(); + } + /** * Get the checkpointInterval * @@ -477,4 +505,5 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; return "KahaDBPersistenceAdapter[" + path + "]"; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 6e2540744f..34bcccc0fb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -48,6 +48,7 @@ import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; @@ -363,6 +364,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); + } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { @@ -901,6 +903,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public long getLastMessageBrokerSequenceId() throws IOException { return 0; } + + public long getLastProducerSequenceId(ProducerId id) { + indexLock.readLock().lock(); + try { + return metadata.producerSequenceIdTracker.getLastSeqId(id); + } finally { + indexLock.readLock().unlock(); + } + } public long size() { if (!isStarted()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 55e6f6f6b6..4a40c0e779 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -16,11 +16,15 @@ */ package org.apache.activemq.store.kahadb; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; @@ -35,22 +39,25 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; @@ -93,9 +100,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private static final Log LOG = LogFactory.getLog(MessageDatabase.class); private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; - static final int CLOSED_STATE = 1; - static final int OPEN_STATE = 2; - static final long NOT_ACKED = -1; + static final int CLOSED_STATE = 1; + static final int OPEN_STATE = 2; + static final long NOT_ACKED = -1; protected class Metadata { @@ -104,6 +111,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected BTreeIndex destinations; protected Location lastUpdate; protected Location firstInProgressTransactionLocation; + protected Location producerSequenceIdTrackerLocation = null; + protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); public void read(DataInput is) throws IOException { state = is.readInt(); @@ -118,6 +127,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } else { firstInProgressTransactionLocation = null; } + try { + if (is.readBoolean()) { + producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); + } else { + producerSequenceIdTrackerLocation = null; + } + } catch (EOFException expectedOnUpgrade) { + } } public void write(DataOutput os) throws IOException { @@ -137,6 +154,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } else { os.writeBoolean(false); } + + if (producerSequenceIdTrackerLocation != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); + } else { + os.writeBoolean(false); + } } } @@ -154,7 +178,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected PageFile pageFile; protected Journal journal; - protected Metadata metadata = new Metadata(); + protected Metadata metadata = new Metadata(); protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); @@ -171,7 +195,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; boolean enableIndexWriteAsync = false; - int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + protected AtomicBoolean opened = new AtomicBoolean(); private LockFile lockFile; @@ -381,15 +406,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private Location getFirstInProgressTxLocation() { Location l = null; synchronized (inflightTransactions) { - if (!inflightTransactions.isEmpty()) { - l = inflightTransactions.values().iterator().next().get(0).getLocation(); - } - if (!preparedTransactions.isEmpty()) { - Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); - if (l==null || t.compareTo(l) <= 0) { - l = t; + if (!inflightTransactions.isEmpty()) { + l = inflightTransactions.values().iterator().next().get(0).getLocation(); + } + if (!preparedTransactions.isEmpty()) { + Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); + if (l==null || t.compareTo(l) <= 0) { + l = t; + } } - } } return l; } @@ -407,21 +432,25 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar try { long start = System.currentTimeMillis(); - Location recoveryPosition = getRecoveryPosition(); - if( recoveryPosition!=null ) { - int redoCounter = 0; - LOG.info("Recoverying from the journal ..."); - while (recoveryPosition != null) { - JournalCommand message = load(recoveryPosition); - metadata.lastUpdate = recoveryPosition; - process(message, recoveryPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - } - long end = System.currentTimeMillis(); - LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + Location producerAuditPosition = recoverProducerAudit(); + Location lastIndoubtPosition = getRecoveryPosition(); + + Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); + + if (recoveryPosition != null) { + int redoCounter = 0; + LOG.info("Recoverying from the journal ..."); + while (recoveryPosition != null) { + JournalCommand message = load(recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition, lastIndoubtPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } - + // We may have to undo some index updates. pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -433,7 +462,39 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - protected void recoverIndex(Transaction tx) throws IOException { + private Location minimum(Location producerAuditPosition, + Location lastIndoubtPosition) { + Location min = null; + if (producerAuditPosition != null) { + min = producerAuditPosition; + if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { + min = lastIndoubtPosition; + } + } else { + min = lastIndoubtPosition; + } + return min; + } + + private Location recoverProducerAudit() throws IOException { + if (metadata.producerSequenceIdTrackerLocation != null) { + KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); + try { + ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); + metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); + } catch (ClassNotFoundException cfe) { + IOException ioe = new IOException("Failed to read producerAudit: " + cfe); + ioe.initCause(cfe); + throw ioe; + } + return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); + } else { + // got no audit stored so got to recreate via replay from start of the journal + return journal.getNextLocation(null); + } + } + + protected void recoverIndex(Transaction tx) throws IOException { long start = System.currentTimeMillis(); // It is possible index updates got applied before the journal updates.. // in that case we need to removed references to messages that are not in the journal @@ -457,6 +518,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); sd.locationIndex.remove(tx, keys.location); sd.messageIdIndex.remove(tx, keys.messageId); + metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId)); undoCounter++; // TODO: do we need to modify the ack positions for the pub sub case? } @@ -588,7 +650,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar while (nextRecoveryPosition != null) { lastRecoveryPosition = nextRecoveryPosition; metadata.lastUpdate = lastRecoveryPosition; - JournalCommand message = load(lastRecoveryPosition); + JournalCommand message = load(lastRecoveryPosition); process(message, lastRecoveryPosition); nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); } @@ -601,8 +663,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return metadata.lastUpdate; } - private Location getRecoveryPosition() throws IOException { - + private Location getRecoveryPosition() throws IOException { + // If we need to recover the transactions.. if (metadata.firstInProgressTransactionLocation != null) { return metadata.firstInProgressTransactionLocation; @@ -613,7 +675,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // Start replay at the record after the last one recorded in the index file. return journal.getNextLocation(metadata.lastUpdate); } - + // This loads the first position. return journal.getNextLocation(null); } @@ -658,7 +720,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // ///////////////////////////////////////////////////////////////// // Methods call by the broker to update and query the store. // ///////////////////////////////////////////////////////////////// - public Location store(JournalCommand data) throws IOException { + public Location store(JournalCommand data) throws IOException { return store(data, false, null,null); } @@ -669,7 +731,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * during a recovery process. * @param done */ - public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { + public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { if (before != null) { before.run(); } @@ -716,7 +778,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * @return * @throws IOException */ - public JournalCommand load(Location location) throws IOException { + public JournalCommand load(Location location) throws IOException { ByteSequence data = journal.read(location); DataByteArrayInputStream is = new DataByteArrayInputStream(data); byte readByte = is.readByte(); @@ -724,10 +786,30 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if( type == null ) { throw new IOException("Could not load journal record. Invalid location: "+location); } - JournalCommand message = (JournalCommand)type.createMessage(); + JournalCommand message = (JournalCommand)type.createMessage(); message.mergeFramed(is); return message; } + + /** + * do minimal recovery till we reach the last inDoubtLocation + * @param data + * @param location + * @param inDoubtlocation + * @throws IOException + */ + void process(JournalCommand data, final Location location, final Location inDoubtlocation) throws IOException { + if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { + process(data, location); + } else { + // just recover producer audit + data.visit(new Visitor() { + public void visit(KahaAddMessageCommand command) throws IOException { + metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); + } + }); + } + } // ///////////////////////////////////////////////////////////////// // Journaled record processing methods. Once the record is journaled, @@ -735,7 +817,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // from the recovery method too so they need to be idempotent // ///////////////////////////////////////////////////////////////// - void process(JournalCommand data, final Location location) throws IOException { + void process(JournalCommand data, final Location location) throws IOException { data.visit(new Visitor() { @Override public void visit(KahaAddMessageCommand command) throws IOException { @@ -911,7 +993,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if( previous == null ) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); if( previous == null ) { - sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); + sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); } else { // If the message ID as indexed, then the broker asked us to store a DUP // message. Bad BOY! Don't do it, and log a warning. @@ -927,7 +1009,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // TODO: consider just rolling back the tx. sd.locationIndex.put(tx, location, previous); } - + // record this id in any event, initial send or recovery + metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); } void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { @@ -1025,6 +1108,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar LOG.debug("Checkpoint started."); metadata.state = OPEN_STATE; + metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); @@ -1111,6 +1195,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar LOG.debug("Checkpoint done."); } + private Location checkpointProducerAudit() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(metadata.producerSequenceIdTracker); + oout.flush(); + oout.close(); + return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray()))); + } + public HashSet getJournalFilesBeingReplicated() { return journalFilesBeingReplicated; } @@ -1580,6 +1673,22 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return journalMaxFileLength; } + public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { + this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); + } + + public int getMaxFailoverProducersToTrack() { + return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); + } + + public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { + this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); + } + + public int getFailoverProducersAuditDepth() { + return this.metadata.producerSequenceIdTracker.getAuditDepth(); + } + public PageFile getPageFile() { if (pageFile == null) { pageFile = createPageFile(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index a26efae4f0..7ac9f3ff31 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; @@ -569,5 +570,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA throw new IllegalArgumentException("Not in the valid destination format"); } } + + public long getLastProducerSequenceId(ProducerId id) { + return -1; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index 902deb15b8..544148a520 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; @@ -52,5 +53,8 @@ public class Visitor { public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException { } + + public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 9496cff1a8..70ee3d4586 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -27,6 +27,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ProxyMessageStore; @@ -201,4 +202,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { createTransactionStore(); } } + + public long getLastProducerSequenceId(ProducerId id) { + // memory map does duplicate suppression + return -1; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 91b9b8c1f3..109d077e78 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -727,7 +727,7 @@ public class FailoverTransport implements CompositeTransport { for (Iterator iter2 = tmpMap.values().iterator(); iter2.hasNext();) { Command command = iter2.next(); if (LOG.isTraceEnabled()) { - LOG.trace("restore, replay: " + command); + LOG.trace("restore requestMap, replay: " + command); } t.oneway(command); } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java index d26f3078c2..d1ca8745f1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java @@ -19,6 +19,7 @@ package org.apache.activemq.util; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; /** * Simple BitArray to enable setting multiple boolean values efficently Used @@ -27,7 +28,10 @@ import java.io.IOException; * * @version $Revision: 1.1.1.1 $ */ -public class BitArray { +public class BitArray implements Serializable { + + private static final long serialVersionUID = 1L; + static final int LONG_SIZE = 64; static final int INT_SIZE = 32; static final int SHORT_SIZE = 16; @@ -113,6 +117,14 @@ public class BitArray { this.bits = bits; } + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + writeToStream(out); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + readFromStream(in); + } + /** * write the bits to an output stream * diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java index 7ab7339534..e9912904b5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.util; +import java.io.Serializable; import java.util.LinkedList; /** @@ -23,8 +24,9 @@ import java.util.LinkedList; * * @version $Revision: 1.1.1.1 $ */ -public class BitArrayBin { +public class BitArrayBin implements Serializable { + private static final long serialVersionUID = 1L; private LinkedList list; private int maxNumberOfArrays; private int firstIndex = -1; @@ -162,4 +164,22 @@ public class BitArrayBin { } return answer; } + + public long getLastSetIndex() { + long result = -1; + + if (firstIndex >=0) { + result = firstIndex; + BitArray last = null; + for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) { + last = list.get(lastBitArrayIndex); + if (last != null) { + result += last.length() -1; + result += lastBitArrayIndex * BitArray.LONG_SIZE; + break; + } + } + } + return result; + } } diff --git a/activemq-core/src/main/proto/journal-data.proto b/activemq-core/src/main/proto/journal-data.proto index ee0f421a1b..28543ea0d5 100644 --- a/activemq-core/src/main/proto/journal-data.proto +++ b/activemq-core/src/main/proto/journal-data.proto @@ -29,6 +29,7 @@ enum KahaEntryType { KAHA_ROLLBACK_COMMAND = 5; KAHA_REMOVE_DESTINATION_COMMAND = 6; KAHA_SUBSCRIPTION_COMMAND = 7; + KAHA_PRODUCER_AUDIT_COMMAND = 8; } message KahaTraceCommand { @@ -109,6 +110,18 @@ message KahaSubscriptionCommand { optional bytes subscriptionInfo = 4; } +message KahaProducerAuditCommand { + // We make use of the wonky comment style bellow because the following options + // are not valid for protoc, but they are valid for the ActiveMQ proto compiler. + // In the ActiveMQ proto compiler, comments terminate with the pipe character: | + + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required bytes audit = 1; +} + message KahaDestination { enum DestinationType { QUEUE = 0; diff --git a/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java b/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java index a0f296dcad..aee9ceecc0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java @@ -152,6 +152,7 @@ public class PerDestinationStoreLimitTest extends JmsTestSupport { Thread.sleep(1000); // the producer is blocked once the done flag stays true if (done.get()) { + LOG.info("Blocked...."); break; } done.set(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java index e1930a151e..3ba15fe640 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java @@ -66,5 +66,6 @@ public class ConnectionInfoTest extends BaseCommandTestSupport { info.setManageable(false); info.setClientMaster(true); info.setFaultTolerant(false); + info.setFailoverReconnect(true); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java index 778ccc6f83..0fddfc6a81 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -119,6 +120,23 @@ public abstract class StoreOrderTest { } } + @Test + public void testCompositeSendReceiveAfterRestart() throws Exception { + destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest"); + enqueueOneMessage(); + + LOG.info("restart broker"); + stopBroker(); + broker = createRestartedBroker(); + dumpMessages(); + initConnection(); + destination = new ActiveMQQueue("StoreOrderTest"); + assertNotNull("got one message from first dest", receiveOne()); + dumpMessages(); + destination = new ActiveMQQueue("SecondStoreOrderTest"); + assertNotNull("got one message from second dest", receiveOne()); + } + @Test public void validateUnorderedTxCommit() throws Exception { @@ -247,6 +265,7 @@ public abstract class StoreOrderTest { PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setMemoryLimit(1024*3); defaultEntry.setCursorMemoryHighWaterMark(68); + defaultEntry.setExpireMessagesPeriod(0); map.setDefaultEntry(defaultEntry); brokerService.setDestinationPolicy(map); } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java index c43ff9f0d7..ce539d2381 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java @@ -28,6 +28,11 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport { protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + + // explicitly enable audit as it is now off by default + // due to org.apache.activemq.broker.ProducerBrokerExchange.canDispatch(Message) + jdbc.setEnableAudit(true); + brokerService.setSchedulerSupport(false); brokerService.setPersistenceAdapter(jdbc); jdbc.setBrokerService(brokerService); @@ -56,6 +61,5 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport { if (!failed) { fail("Should have failed with audit turned off"); } - } - + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java index 062c6cfacf..a010291cb0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java @@ -43,7 +43,7 @@ public class JDBCStoreOrderTest extends StoreOrderTest { while(result.next()) { long id = result.getLong(1); Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); - LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); + LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); } statement.close(); conn.close(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 6f31db876a..d18d01e7bf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -49,10 +50,14 @@ import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.SocketProxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; @@ -86,6 +91,7 @@ public class FailoverTransactionTest { public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { broker = new BrokerService(); broker.setUseJmx(false); + broker.setAdvisorySupport(false); broker.addConnector(url); broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); return broker; @@ -114,7 +120,7 @@ public class FailoverTransactionTest { } @Test - public void testFailoverCommitReplyLost() throws Exception { + public void testFailoverCommitReplyLostAMQ() throws Exception { doTestFailoverCommitReplyLost(0); } @@ -222,15 +228,257 @@ public class FailoverTransactionTest { connection.close(); } + + //@Test not implemented + public void testFailoverSendReplyLostAMQ() throws Exception { + doTestFailoverSendReplyLost(0); + } + + @Test + public void testFailoverSendReplyLostJdbc() throws Exception { + doTestFailoverSendReplyLost(1); + } + + @Test + public void testFailoverSendReplyLostKahaDB() throws Exception { + doTestFailoverSendReplyLost(2); + } + + public void doTestFailoverSendReplyLost(final int adapter) throws Exception { + + broker = createBroker(true); + setPersistenceAdapter(adapter); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + @Override + public void send(ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + // so send will hang as if reply is lost + super.send(producerExchange, messageSend); + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker post send..."); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + }); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); + Connection connection = cf.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(destination); + final CountDownLatch sendDoneLatch = new CountDownLatch(1); + // broker will die on send reply so this will hang till restart + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async send..."); + try { + produceMessage(session, destination); + } catch (JMSException e) { + //assertTrue(e instanceof TransactionRolledBackException); + LOG.error("got send exception: ", e); + fail("got unexpected send exception" + e); + } + sendDoneLatch.countDown(); + LOG.info("done async send"); + } + }); + + // will be stopped by the plugin + broker.waitUntilStopped(); + broker = createBroker(false); + setPersistenceAdapter(adapter); + LOG.info("restarting...."); + broker.start(); + + assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); + + // new transaction + Message msg = consumer.receive(20000); + LOG.info("Received: " + msg); + assertNotNull("we got the message", msg); + assertNull("we got just one message", consumer.receive(2000)); + consumer.close(); + connection.close(); + + // verify stats + assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); + + // ensure no dangling messages with fresh broker etc + broker.stop(); + broker.waitUntilStopped(); + + LOG.info("Checking for remaining/hung messages with second restart.."); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + // after restart, ensure no dangling messages + cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + connection = cf.createConnection(); + connection.start(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session2.createConsumer(destination); + msg = consumer.receive(1000); + if (msg == null) { + msg = consumer.receive(5000); + } + LOG.info("Received: " + msg); + assertNull("no messges left dangling but got: " + msg, msg); + connection.close(); + } + + // not implemented.. @Test + public void testFailoverConnectionSendReplyLostAMQ() throws Exception { + doTestFailoverConnectionSendReplyLost(0); + } + + @Test + public void testFailoverConnectionSendReplyLostJdbc() throws Exception { + doTestFailoverConnectionSendReplyLost(1); + } + + @Test + public void testFailoverConnectionSendReplyLostKahaDB() throws Exception { + doTestFailoverConnectionSendReplyLost(2); + } + + public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception { + + broker = createBroker(true); + setPersistenceAdapter(adapter); + + final SocketProxy proxy = new SocketProxy(); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + private boolean firstSend = true; + + @Override + public void send(ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + // so send will hang as if reply is lost + super.send(producerExchange, messageSend); + if (firstSend) { + firstSend = false; + + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping connection post send..."); + try { + proxy.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + broker.start(); + + proxy.setTarget(new URI(url)); + proxy.open(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false"); + Connection connection = cf.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(destination); + final CountDownLatch sendDoneLatch = new CountDownLatch(1); + // proxy connection will die on send reply so this will hang on failover reconnect till open + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async send..."); + try { + produceMessage(session, destination); + } catch (JMSException e) { + //assertTrue(e instanceof TransactionRolledBackException); + LOG.info("got send exception: ", e); + } + sendDoneLatch.countDown(); + LOG.info("done async send"); + } + }); + + // will be closed by the plugin + assertTrue("proxy was closed", proxy.waitUntilClosed(30)); + LOG.info("restarting proxy"); + proxy.open(); + + assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); + + Message msg = consumer.receive(20000); + LOG.info("Received: " + msg); + assertNotNull("we got the message", msg); + assertNull("we got just one message", consumer.receive(2000)); + consumer.close(); + connection.close(); + + // verify stats, connection dup suppression means dups don't get to broker + assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + + // ensure no dangling messages with fresh broker etc + broker.stop(); + broker.waitUntilStopped(); + + LOG.info("Checking for remaining/hung messages with restart.."); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + // after restart, ensure no dangling messages + cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + connection = cf.createConnection(); + connection.start(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session2.createConsumer(destination); + msg = consumer.receive(1000); + if (msg == null) { + msg = consumer.receive(5000); + } + LOG.info("Received: " + msg); + assertNull("no messges left dangling but got: " + msg, msg); + connection.close(); + } + + + private void setPersistenceAdapter(int adapter) throws IOException { switch (adapter) { case 0: + broker.setPersistenceAdapter(new AMQPersistenceAdapter()); break; case 1: broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); break; case 2: KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter(); + // duplicate checker not updated on canceled tasks, even it + // it was, reovery of the audit would fail as the message is + // not recorded in the store and the audit may not be up to date. + // So if duplicate are a nono (w.r.t stats), this must be disabled + store.setConcurrentStoreAndDispatchQueues(false); + store.setMaxFailoverProducersToTrack(10); store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest")); broker.setPersistenceAdapter(store); break; diff --git a/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java b/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java index 8c856e46ea..0c0d3e7c91 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java @@ -32,21 +32,29 @@ public class BitArrayBinTest extends TestCase { for (int i=0; i <= dataSize; i++) { assertTrue("not already set", !toTest.setBit(i, Boolean.TRUE)); + assertEquals("current is max", i, toTest.getLastSetIndex()); } + assertEquals("last is max", dataSize, toTest.getLastSetIndex()); + int windowOfValidData = roundWindow(dataSize, window); int i=dataSize; for (; i >= dataSize -windowOfValidData; i--) { assertTrue("was already set, id=" + i, toTest.setBit(i, Boolean.TRUE)); } + + assertEquals("last is still max", dataSize, toTest.getLastSetIndex()); for (; i >= 0; i--) { assertTrue("was not already set, id=" + i, !toTest.setBit(i, Boolean.TRUE)); } - for (int j= dataSize +1; j<(2*dataSize); j++) { + for (int j= dataSize +1; j<=(2*dataSize); j++) { assertTrue("not already set: id=" + j, !toTest.setBit(j, Boolean.TRUE)); } + + assertEquals("last still max*2", 2*dataSize, toTest.getLastSetIndex()); + } public void testSetUnsetAroundWindow() throws Exception { @@ -87,6 +95,7 @@ public class BitArrayBinTest extends TestCase { int instance = value +muliplier*BitArray.LONG_SIZE; assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE)); assertTrue("not already set: id=" + value, !toTest.setBit(value, Boolean.TRUE)); + assertEquals("max set correct", instance, toTest.getLastSetIndex()); } } } @@ -109,6 +118,22 @@ public class BitArrayBinTest extends TestCase { assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE)); } + + public void testLastSeq() throws Exception { + BitArrayBin toTest = new BitArrayBin(512); + assertEquals("last not set", -1, toTest.getLastSetIndex()); + + toTest.setBit(1, Boolean.TRUE); + assertEquals("last correct", 1, toTest.getLastSetIndex()); + + toTest.setBit(64, Boolean.TRUE); + assertEquals("last correct", 64, toTest.getLastSetIndex()); + + toTest.setBit(68, Boolean.TRUE); + assertEquals("last correct", 68, toTest.getLastSetIndex()); + + } + // window moves in increments of BitArray.LONG_SIZE. // valid data window on low end can be larger than window private int roundWindow(int dataSetEnd, int windowSize) { diff --git a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java index 090232f507..d012cf0a0a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -46,6 +47,8 @@ public class SocketProxy { private Acceptor acceptor; private ServerSocket serverSocket; + + private CountDownLatch closed = new CountDownLatch(1); public List connections = new LinkedList(); @@ -87,6 +90,7 @@ public class SocketProxy { } acceptor = new Acceptor(serverSocket, target); new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start(); + closed = new CountDownLatch(1); } public URI getUrl() { @@ -106,6 +110,11 @@ public class SocketProxy { closeConnection(con); } acceptor.close(); + closed.countDown(); + } + + public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException { + return closed.await(timeoutSeconds, TimeUnit.SECONDS); } /* @@ -303,10 +312,12 @@ public class SocketProxy { public void close() { try { socket.close(); + closed.countDown(); goOn(); } catch (IOException ignored) { } } } + }