diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index 6950ca5d76..662fb9ada8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -23,6 +23,7 @@ import org.apache.activemq.broker.InsertableMutableBrokerFilter; import org.apache.activemq.broker.MutableBrokerFilter; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -30,6 +31,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; @@ -131,6 +133,17 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ super.removeProducer(context, info); sendAsyncToSlave(new RemoveInfo(info.getProducerId())); } + + public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable { + super.addConsumer(context, info); + sendAsyncToSlave(info); + } + + + public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable { + super.removeSubscription(context, info); + sendAsyncToSlave(info); + } @@ -163,6 +176,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ super.rollbackTransaction(context, xid); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK); sendAsyncToSlave(info); + } /** @@ -174,7 +188,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{ super.commitTransaction(context, xid,onePhase); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE); - sendAsyncToSlave(info); + sendSyncToSlave(info); } /** @@ -205,26 +219,40 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ } public void send(ConnectionContext context, Message message) throws Throwable{ + /** + * A message can be dispatched before the super.send() method returns + * so - here the order is switched to avoid problems on the slave + * with receiving acks for messages not received yey + */ + sendToSlave(message); super.send(context,message); - sendAsyncToSlave(message); } public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{ super.acknowledge(context, ack); - sendAsyncToSlave(ack); + sendToSlave(ack); } protected void sendToSlave(Message message){ - /* - if (message.isPersistent()){ + + if (message.isPersistent() && !message.isInTransaction()){ sendSyncToSlave(message); }else{ sendAsyncToSlave(message); } - */ - sendAsyncToSlave(message); + + + } + + protected void sendToSlave(MessageAck ack){ + + if (ack.isInTransaction()){ + sendAsyncToSlave(ack); + }else{ + sendSyncToSlave(ack); + } } protected void sendAsyncToSlave(Command command){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java index 7e12b202fc..bdaf12494b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java @@ -233,6 +233,6 @@ public class MasterConnector implements Service{ private void shutDown(){ masterActive.set(false); broker.masterFailed(); - //ServiceSupport.dispose(this); + ServiceSupport.dispose(this); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 7bbace21ba..83f2f04ea8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -57,6 +57,8 @@ public class IndirectMessageReference implements MessageReference { private Message message; /** The number of times the message has requested being hardened */ private int referenceCount; + /** the size of the message **/ + private int cachedSize = 0; /** * Only used by the END_OF_BROWSE_MARKER singleton @@ -69,6 +71,7 @@ public class IndirectMessageReference implements MessageReference { this.groupID = null; this.groupSequence = 0; this.targetConsumerId=null; + this.cachedSize = message != null ? message.getSize() : 0; } public IndirectMessageReference(Destination destination, Message message) { @@ -81,7 +84,8 @@ public class IndirectMessageReference implements MessageReference { this.targetConsumerId=message.getTargetConsumerId(); this.referenceCount=1; - message.incrementReferenceCount(); + message.incrementReferenceCount(); + this.cachedSize = message != null ? message.getSize() : 0; } synchronized public Message getMessageHardRef() { @@ -202,4 +206,12 @@ public class IndirectMessageReference implements MessageReference { public ConsumerId getTargetConsumerId() { return targetConsumerId; } + + public int getSize(){ + Message msg = message; + if (msg != null){ + return msg.getSize(); + } + return cachedSize; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java index 79a0dad8ad..b8c48f397d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java @@ -46,5 +46,6 @@ public interface MessageReference { public int incrementReferenceCount(); public int decrementReferenceCount(); public ConsumerId getTargetConsumerId(); + public int getSize(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 16b85854a0..d36c847107 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -1,18 +1,15 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.region; @@ -29,217 +26,186 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.transaction.Synchronization; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; - import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; - /** * A subscription that honors the pre-fetch option of the ConsumerInfo. * * @version $Revision: 1.15 $ */ -abstract public class PrefetchSubscription extends AbstractSubscription { - static private final Log log = LogFactory.getLog(PrefetchSubscription.class); - - final protected LinkedList matched = new LinkedList(); - final protected LinkedList dispatched = new LinkedList(); - +abstract public class PrefetchSubscription extends AbstractSubscription{ + static private final Log log=LogFactory.getLog(PrefetchSubscription.class); + final protected LinkedList matched=new LinkedList(); + final protected LinkedList dispatched=new LinkedList(); protected int delivered=0; - int preLoadLimit=1024*100; int preLoadSize=0; boolean dispatching=false; - - public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { - super(broker,context, info); + + public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) + throws InvalidSelectorException{ + super(broker,context,info); } - synchronized public void add(MessageReference node) throws Throwable { - if( !isFull() && !isSlaveBroker()) { + synchronized public void add(MessageReference node) throws Throwable{ + if(!isFull()&&!isSlaveBroker()){ dispatch(node); - } else { + }else{ synchronized(matched){ matched.addLast(node); } } - } - - public void processMessageDispatchNotification(MessageDispatchNotification mdn){ + + public void processMessageDispatchNotification(MessageDispatchNotification mdn){ synchronized(matched){ - for (Iterator i = matched.iterator(); i.hasNext();){ - MessageReference node = (MessageReference)i.next(); - if (node.getMessageId().equals(mdn.getMessageId())){ + for(Iterator i=matched.iterator();i.hasNext();){ + MessageReference node=(MessageReference) i.next(); + if(node.getMessageId().equals(mdn.getMessageId())){ i.remove(); - try { - MessageDispatch md = createMessageDispatch(node, node.getMessage()); - dispatched.addLast(node); - - incrementPreloadSize(node.getMessage().getSize()); - node.decrementReferenceCount(); + try{ + MessageDispatch md=createMessageDispatch(node,node.getMessage()); + dispatched.addLast(node); + incrementPreloadSize(node.getSize()); + node.decrementReferenceCount(); }catch(Exception e){ - log.error("Problem processing MessageDispatchNotification: " + mdn,e); + log.error("Problem processing MessageDispatchNotification: "+mdn,e); } break; } } } } - - synchronized public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable { - + + synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{ // Handle the standard acknowledgment case. - boolean wasFull = isFull(); - if( ack.isStandardAck() ) { - + boolean wasFull=isFull(); + if(ack.isStandardAck()){ // Acknowledge all dispatched messages up till the message id of the acknowledgment. int index=0; boolean inAckRange=false; - for (Iterator iter = dispatched.iterator(); iter.hasNext();) { - final MessageReference node = (MessageReference)iter.next(); - MessageId messageId = node.getMessageId(); - - if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { - inAckRange = true; + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + final MessageReference node=(MessageReference) iter.next(); + MessageId messageId=node.getMessageId(); + if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ + inAckRange=true; } - - if( inAckRange ) { - + if(inAckRange){ // Don't remove the nodes until we are committed. - if ( !context.isInTransaction() ) { + if(!context.isInTransaction()){ iter.remove(); - } else { + }else{ // setup a Synchronization to remove nodes from the dispatched list. context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Throwable { - synchronized(PrefetchSubscription.this) { - + public void afterCommit() throws Throwable{ + synchronized(PrefetchSubscription.this){ // Now that we are committed, we can remove the nodes. boolean inAckRange=false; int index=0; - for (Iterator iter = dispatched.iterator(); iter.hasNext();) { - final MessageReference node = (MessageReference)iter.next(); - MessageId messageId = node.getMessageId(); - if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { - inAckRange = true; + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + final MessageReference node=(MessageReference) iter.next(); + MessageId messageId=node.getMessageId(); + if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ + inAckRange=true; } - if( inAckRange ) { + if(inAckRange){ index++; iter.remove(); - if( ack.getLastMessageId().equals(messageId)) { - delivered = Math.max(0, delivered - (index+1)); + if(ack.getLastMessageId().equals(messageId)){ + delivered=Math.max(0,delivered-(index+1)); return; } } } - } } - }); + }); } - index++; - acknowledge(context, ack, node); - if( ack.getLastMessageId().equals(messageId)) { - if ( context.isInTransaction() ) - delivered = Math.max(delivered,index+1); - else - delivered = Math.max(0, delivered - (index+1)); - - if( wasFull && !isFull() ) { + acknowledge(context,ack,node); + if(ack.getLastMessageId().equals(messageId)){ + if(context.isInTransaction()) + delivered=Math.max(delivered,index+1); + else + delivered=Math.max(0,delivered-(index+1)); + if(wasFull&&!isFull()){ dispatchMatched(); } return; - } else { -// System.out.println("no match: "+ack.getLastMessageId()+","+messageId); + }else{ + // System.out.println("no match: "+ack.getLastMessageId()+","+messageId); } } - } log.info("Could not correlate acknowledgment with dispatched message: "+ack); - - } else if( ack.isDeliveredAck() ) { - + }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. // Acknowledge all dispatched messages up till the message id of the acknowledgment. int index=0; - for (Iterator iter = dispatched.iterator(); iter.hasNext();index++) { - final MessageReference node = (MessageReference)iter.next(); - if( ack.getLastMessageId().equals(node.getMessageId()) ) { - delivered = Math.max(delivered,index+1); - if( wasFull && !isFull() ) { + for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ + final MessageReference node=(MessageReference) iter.next(); + if(ack.getLastMessageId().equals(node.getMessageId())){ + delivered=Math.max(delivered,index+1); + if(wasFull&&!isFull()){ dispatchMatched(); } return; } } throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); - - } else if( ack.isPoisonAck() ) { - + }else if(ack.isPoisonAck()){ // TODO: what if the message is already in a DLQ??? - - // Handle the poison ACK case: we need to send the message to a DLQ - if( ack.isInTransaction() ) + // Handle the poison ACK case: we need to send the message to a DLQ + if(ack.isInTransaction()) throw new JMSException("Poison ack cannot be transacted: "+ack); - // Acknowledge all dispatched messages up till the message id of the acknowledgment. int index=0; boolean inAckRange=false; - for (Iterator iter = dispatched.iterator(); iter.hasNext();) { - final MessageReference node = (MessageReference)iter.next(); - MessageId messageId = node.getMessageId(); - - if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { - inAckRange = true; + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + final MessageReference node=(MessageReference) iter.next(); + MessageId messageId=node.getMessageId(); + if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ + inAckRange=true; } - - if( inAckRange ) { - + if(inAckRange){ // Send the message to the DLQ node.incrementReferenceCount(); - try { - Message message = node.getMessage(); - if( message !=null ) { - - // The original destination and transaction id do not get filled when the message is first sent, + try{ + Message message=node.getMessage(); + if(message!=null){ + // The original destination and transaction id do not get filled when the message is first + // sent, // it is only populated if the message is routed to another destination like the DLQ - if( message.getOriginalDestination()!=null ) + if(message.getOriginalDestination()!=null) message.setOriginalDestination(message.getDestination()); - if( message.getOriginalTransactionId()!=null ) + if(message.getOriginalTransactionId()!=null) message.setOriginalTransactionId(message.getTransactionId()); - - DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy(); - ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination()); + DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); + ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message + .getDestination()); message.setDestination(deadLetterDestination); message.setTransactionId(null); message.evictMarshlledForm(); - - boolean originalFlowControl = context.isProducerFlowControl(); - try { + boolean originalFlowControl=context.isProducerFlowControl(); + try{ context.setProducerFlowControl(false); - context.getBroker().send(context, message); - } finally { + context.getBroker().send(context,message); + }finally{ context.setProducerFlowControl(originalFlowControl); } - - } - } finally { + } + }finally{ node.decrementReferenceCount(); } - iter.remove(); index++; - acknowledge(context, ack, node); - if( ack.getLastMessageId().equals(messageId)) { - - delivered = Math.max(0, delivered - (index+1)); - - if( wasFull && !isFull() ) { + acknowledge(context,ack,node); + if(ack.getLastMessageId().equals(messageId)){ + delivered=Math.max(0,delivered-(index+1)); + if(wasFull&&!isFull()){ dispatchMatched(); } return; @@ -248,128 +214,115 @@ abstract public class PrefetchSubscription extends AbstractSubscription { } throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); } - throw new JMSException("Invalid acknowledgment: "+ack); } - - protected boolean isFull() { - return dispatched.size()-delivered >= info.getPrefetchSize() || preLoadSize > preLoadLimit; + + protected boolean isFull(){ + return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit; } - - protected void dispatchMatched() throws IOException { - if(!dispatching) { - dispatching = true; - try { - for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) { - MessageReference node = (MessageReference) iter.next(); + + protected void dispatchMatched() throws IOException{ + if(!dispatching){ + dispatching=true; + try{ + for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ + MessageReference node=(MessageReference) iter.next(); iter.remove(); dispatch(node); } - } finally { + }finally{ dispatching=false; } } } - - - private void dispatch(final MessageReference node) throws IOException { + private void dispatch(final MessageReference node) throws IOException{ node.incrementReferenceCount(); - - final Message message = node.getMessage(); - if( message == null ) { + final Message message=node.getMessage(); + if(message==null){ return; - } - + } // Make sure we can dispatch a message. - if( canDispatch(node) && !isSlaveBroker()) { - - MessageDispatch md = createMessageDispatch(node, message); + if(canDispatch(node)&&!isSlaveBroker()){ + MessageDispatch md=createMessageDispatch(node,message); dispatched.addLast(node); - - incrementPreloadSize(node.getMessage().getSize()); - - if( info.isDispatchAsync() ) { + incrementPreloadSize(node.getMessage().getSize()); + if(info.isDispatchAsync()){ md.setConsumer(new Runnable(){ - public void run() { - // Since the message gets queued up in async dispatch, we don't want to + public void run(){ + // Since the message gets queued up in async dispatch, we don't want to // decrease the reference count until it gets put on the wire. - onDispatch(node, message); + onDispatch(node,message); } }); context.getConnection().dispatchAsync(md); - } else { + }else{ context.getConnection().dispatchSync(md); - onDispatch(node, message); + onDispatch(node,message); } // The onDispatch() does the node.decrementReferenceCount(); - } else { + }else{ // We were not allowed to dispatch that message (an other consumer grabbed it before we did) node.decrementReferenceCount(); } - } - synchronized private void onDispatch(final MessageReference node, final Message message) { - - boolean wasFull = isFull(); - decrementPreloadSize(message.getSize()); + synchronized private void onDispatch(final MessageReference node,final Message message){ + boolean wasFull=isFull(); + decrementPreloadSize(message.getSize()); node.decrementReferenceCount(); - - if( node.getRegionDestination() !=null ) { + if(node.getRegionDestination()!=null){ node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message); - - if( wasFull && !isFull() ) { - try { + if(wasFull&&!isFull()){ + try{ dispatchMatched(); - } catch (IOException e) { + }catch(IOException e){ context.getConnection().serviceException(e); } } } - } - - private int incrementPreloadSize(int size) { - preLoadSize += size; + + private int incrementPreloadSize(int size){ + preLoadSize+=size; return preLoadSize; } - - private int decrementPreloadSize(int size) { - preLoadSize -= size; + + private int decrementPreloadSize(int size){ + preLoadSize-=size; return preLoadSize; } - /** * @param node - * @param message TODO + * @param message + * TODO * @return */ - protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { - MessageDispatch md = new MessageDispatch(); - md.setConsumerId( info.getConsumerId() ); - md.setDestination( node.getRegionDestination().getActiveMQDestination() ); + protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ + MessageDispatch md=new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setDestination(node.getRegionDestination().getActiveMQDestination()); md.setMessage(message); - md.setRedeliveryCounter( node.getRedeliveryCounter() ); + md.setRedeliveryCounter(node.getRedeliveryCounter()); return md; } - + /** * Use when a matched message is about to be dispatched to the client. * * @param node - * @return false if the message should not be dispatched to the client (another sub may have already dispatched it for example). + * @return false if the message should not be dispatched to the client (another sub may have already dispatched it + * for example). */ abstract protected boolean canDispatch(MessageReference node); - + /** * Used during acknowledgment to remove the message. - * @throws IOException + * + * @throws IOException */ - protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException { - } - - + protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node) + throws IOException{} } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java deleted file mode 100755 index 70606cffa8..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.ft; - -import java.net.URI; - -import javax.jms.*; -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.xbean.BrokerFactoryBean; -import org.springframework.context.support.AbstractApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.core.io.ClassPathResource; - -public class FTBrokerTest extends TestCase { - - protected static final int MESSAGE_COUNT = 10; - protected BrokerService master; - protected BrokerService slave; - protected Connection connection; - protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; - //protected String uriString = "tcp://localhost:62001"; - - protected void setUp() throws Exception { - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml")); - brokerFactory.afterPropertiesSet(); - master = brokerFactory.getBroker(); - brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml")); - brokerFactory.afterPropertiesSet(); - slave = brokerFactory.getBroker(); - //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")?randomize=false"; - //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")"; - System.out.println("URI = " + uriString); - URI uri = new URI(uriString); - ConnectionFactory fac = new ActiveMQConnectionFactory(uri); - connection = fac.createConnection(); - master.start(); - slave.start(); - //wait for thing to connect - Thread.sleep(1000); - connection.start(); - super.setUp(); - - - - } - - - - - protected void tearDown() throws Exception { - try { - connection.close(); - slave.stop(); - master.stop(); - }catch(Throwable e){ - e.printStackTrace(); - } - - super.tearDown(); - } - - public void testFTBroker() throws Exception{ - - Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(getClass().toString()); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < MESSAGE_COUNT; i++){ - Message msg = session.createTextMessage("test: " + i); - producer.send(msg); - } - master.stop(); - session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < MESSAGE_COUNT; i++){ - System.out.println("GOT MSG: " + consumer.receive(1000)); - } - - } - -} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java new file mode 100644 index 0000000000..a95487543e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java @@ -0,0 +1,76 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.springframework.core.io.ClassPathResource; + +/** + *Test failover for Queues + * + */ +public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsTest{ + + + + protected BrokerService master; + protected BrokerService slave; + protected int inflightMessageCount = 0; + protected int failureCount = 50; + protected String uriString="failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; + + protected void setUp() throws Exception{ + failureCount = super.messageCount/2; + super.topic = isTopic(); + BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml")); + brokerFactory.afterPropertiesSet(); + master=brokerFactory.getBroker(); + brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml")); + brokerFactory.afterPropertiesSet(); + slave=brokerFactory.getBroker(); + master.start(); + slave.start(); + // wait for thing to connect + Thread.sleep(1000); + super.setUp(); + + } + + protected void tearDown() throws Exception{ + super.tearDown(); + slave.stop(); + master.stop(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ + return new ActiveMQConnectionFactory(uriString); + } + + protected void messageSent() throws Exception{ + if (++inflightMessageCount >= failureCount){ + inflightMessageCount = 0; + master.stop(); + } + } + + protected boolean isTopic(){ + return false; + } +}