From 70057cb9ce4924cb6d484833f5eb757cbec1ae8e Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 2 Nov 2011 17:23:15 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3576 - producer exchange sync last seq id git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1196709 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/ProducerBrokerExchange.java | 13 ++-- .../transport/SoWriteTimeoutTest.java | 78 +++++++++++++++---- 2 files changed, 72 insertions(+), 19 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index dcd579f139..a4c6ac04af 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -23,6 +23,8 @@ import org.apache.activemq.state.ProducerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicLong; + /** * Holds internal state in the broker for a MessageProducer * @@ -36,7 +38,7 @@ public class ProducerBrokerExchange { private Region region; private ProducerState producerState; private boolean mutable = true; - private long lastSendSequenceNumber = -1; + private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); public ProducerBrokerExchange() { } @@ -129,18 +131,19 @@ public class ProducerBrokerExchange { */ public boolean canDispatch(Message messageSend) { boolean canDispatch = true; - if (lastSendSequenceNumber > 0) { - if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) { + if (lastSendSequenceNumber.get() > 0) { + if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get()) { canDispatch = false; - LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" + LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); } } + lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId()); return canDispatch; } public void setLastStoredSequenceId(long l) { - lastSendSequenceNumber = l; + lastSendSequenceNumber.set(l); LOG.debug("last stored sequence id set: " + l); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java index ce9c808565..f49e559f39 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java @@ -16,23 +16,12 @@ */ package org.apache.activemq.transport; -import java.net.Socket; -import java.net.SocketException; -import java.net.URI; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - import junit.framework.Test; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.StompConnection; import org.apache.activemq.util.SocketProxy; @@ -40,15 +29,27 @@ import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.*; +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + public class SoWriteTimeoutTest extends JmsTestSupport { private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class); final int receiveBufferSize = 16*1024; - public String brokerTransportScheme = "nio"; + public String brokerTransportScheme = "tcp"; protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker(); - broker.addConnector(brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+ receiveBufferSize); + broker.setPersistent(true); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setConcurrentStoreAndDispatchQueues(false); + broker.setPersistenceAdapter(adapter); + broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0"); if ("nio".equals(brokerTransportScheme)) { broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true"); } @@ -147,6 +148,55 @@ public class SoWriteTimeoutTest extends JmsTestSupport { } } + public void testClientWriteTimeout() throws Exception { + final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout"); + messageTextPrefix = initMessagePrefix(80*1024); + + URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); + LOG.info("consuming using uri: " + tcpBrokerUri); + + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri); + Connection c = factory.createConnection(); + c.start(); + Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + + SocketProxy proxy = new SocketProxy(); + proxy.setTarget(tcpBrokerUri); + proxy.open(); + + ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=500)?jms.useAsyncSend=true&trackMessages=true"); + final Connection pc = pFactory.createConnection(); + pc.start(); + System.out.println("Pausing proxy"); + proxy.pause(); + + final int messageCount = 20; + ExecutorService executorService = Executors.newCachedThreadPool(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + System.out.println("sending messages"); + sendMessages(pc, dest, messageCount); + System.out.println("messages sent"); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + // wait for timeout and reconnect + TimeUnit.SECONDS.sleep(7); + System.out.println("go on"); + proxy.goOn(); + for (int i=0; i