Fixes for Master-Slave functionality

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@372050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-01-24 23:08:29 +00:00
parent 32446c1f1d
commit 8b04e15826
7 changed files with 277 additions and 304 deletions

View File

@ -23,6 +23,7 @@ import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter; import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -30,6 +31,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
@ -131,6 +133,17 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
super.removeProducer(context, info); super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId())); sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
} }
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable {
super.addConsumer(context, info);
sendAsyncToSlave(info);
}
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable {
super.removeSubscription(context, info);
sendAsyncToSlave(info);
}
@ -163,6 +176,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
super.rollbackTransaction(context, xid); super.rollbackTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info); sendAsyncToSlave(info);
} }
/** /**
@ -174,7 +188,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{ public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{
super.commitTransaction(context, xid,onePhase); super.commitTransaction(context, xid,onePhase);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendAsyncToSlave(info); sendSyncToSlave(info);
} }
/** /**
@ -205,26 +219,40 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
} }
public void send(ConnectionContext context, Message message) throws Throwable{ public void send(ConnectionContext context, Message message) throws Throwable{
/**
* A message can be dispatched before the super.send() method returns
* so - here the order is switched to avoid problems on the slave
* with receiving acks for messages not received yey
*/
sendToSlave(message);
super.send(context,message); super.send(context,message);
sendAsyncToSlave(message);
} }
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{ public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{
super.acknowledge(context, ack); super.acknowledge(context, ack);
sendAsyncToSlave(ack); sendToSlave(ack);
} }
protected void sendToSlave(Message message){ protected void sendToSlave(Message message){
/*
if (message.isPersistent()){ if (message.isPersistent() && !message.isInTransaction()){
sendSyncToSlave(message); sendSyncToSlave(message);
}else{ }else{
sendAsyncToSlave(message); sendAsyncToSlave(message);
} }
*/
sendAsyncToSlave(message);
}
protected void sendToSlave(MessageAck ack){
if (ack.isInTransaction()){
sendAsyncToSlave(ack);
}else{
sendSyncToSlave(ack);
}
} }
protected void sendAsyncToSlave(Command command){ protected void sendAsyncToSlave(Command command){

View File

@ -233,6 +233,6 @@ public class MasterConnector implements Service{
private void shutDown(){ private void shutDown(){
masterActive.set(false); masterActive.set(false);
broker.masterFailed(); broker.masterFailed();
//ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
} }

View File

@ -57,6 +57,8 @@ public class IndirectMessageReference implements MessageReference {
private Message message; private Message message;
/** The number of times the message has requested being hardened */ /** The number of times the message has requested being hardened */
private int referenceCount; private int referenceCount;
/** the size of the message **/
private int cachedSize = 0;
/** /**
* Only used by the END_OF_BROWSE_MARKER singleton * Only used by the END_OF_BROWSE_MARKER singleton
@ -69,6 +71,7 @@ public class IndirectMessageReference implements MessageReference {
this.groupID = null; this.groupID = null;
this.groupSequence = 0; this.groupSequence = 0;
this.targetConsumerId=null; this.targetConsumerId=null;
this.cachedSize = message != null ? message.getSize() : 0;
} }
public IndirectMessageReference(Destination destination, Message message) { public IndirectMessageReference(Destination destination, Message message) {
@ -81,7 +84,8 @@ public class IndirectMessageReference implements MessageReference {
this.targetConsumerId=message.getTargetConsumerId(); this.targetConsumerId=message.getTargetConsumerId();
this.referenceCount=1; this.referenceCount=1;
message.incrementReferenceCount(); message.incrementReferenceCount();
this.cachedSize = message != null ? message.getSize() : 0;
} }
synchronized public Message getMessageHardRef() { synchronized public Message getMessageHardRef() {
@ -202,4 +206,12 @@ public class IndirectMessageReference implements MessageReference {
public ConsumerId getTargetConsumerId() { public ConsumerId getTargetConsumerId() {
return targetConsumerId; return targetConsumerId;
} }
public int getSize(){
Message msg = message;
if (msg != null){
return msg.getSize();
}
return cachedSize;
}
} }

View File

@ -46,5 +46,6 @@ public interface MessageReference {
public int incrementReferenceCount(); public int incrementReferenceCount();
public int decrementReferenceCount(); public int decrementReferenceCount();
public ConsumerId getTargetConsumerId(); public ConsumerId getTargetConsumerId();
public int getSize();
} }

View File

@ -1,18 +1,15 @@
/** /**
* *
* Copyright 2005-2006 The Apache Software Foundation * Copyright 2005-2006 The Apache Software Foundation
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* you may not use this file except in compliance with the License. * the License. You may obtain a copy of the License at
* You may obtain a copy of the License at *
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
@ -29,217 +26,186 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
/** /**
* A subscription that honors the pre-fetch option of the ConsumerInfo. * A subscription that honors the pre-fetch option of the ConsumerInfo.
* *
* @version $Revision: 1.15 $ * @version $Revision: 1.15 $
*/ */
abstract public class PrefetchSubscription extends AbstractSubscription { abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log = LogFactory.getLog(PrefetchSubscription.class); static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched=new LinkedList();
final protected LinkedList matched = new LinkedList(); final protected LinkedList dispatched=new LinkedList();
final protected LinkedList dispatched = new LinkedList();
protected int delivered=0; protected int delivered=0;
int preLoadLimit=1024*100; int preLoadLimit=1024*100;
int preLoadSize=0; int preLoadSize=0;
boolean dispatching=false; boolean dispatching=false;
public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
super(broker,context, info); throws InvalidSelectorException{
super(broker,context,info);
} }
synchronized public void add(MessageReference node) throws Throwable { synchronized public void add(MessageReference node) throws Throwable{
if( !isFull() && !isSlaveBroker()) { if(!isFull()&&!isSlaveBroker()){
dispatch(node); dispatch(node);
} else { }else{
synchronized(matched){ synchronized(matched){
matched.addLast(node); matched.addLast(node);
} }
} }
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn){ public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matched){ synchronized(matched){
for (Iterator i = matched.iterator(); i.hasNext();){ for(Iterator i=matched.iterator();i.hasNext();){
MessageReference node = (MessageReference)i.next(); MessageReference node=(MessageReference) i.next();
if (node.getMessageId().equals(mdn.getMessageId())){ if(node.getMessageId().equals(mdn.getMessageId())){
i.remove(); i.remove();
try { try{
MessageDispatch md = createMessageDispatch(node, node.getMessage()); MessageDispatch md=createMessageDispatch(node,node.getMessage());
dispatched.addLast(node); dispatched.addLast(node);
incrementPreloadSize(node.getSize());
incrementPreloadSize(node.getMessage().getSize()); node.decrementReferenceCount();
node.decrementReferenceCount();
}catch(Exception e){ }catch(Exception e){
log.error("Problem processing MessageDispatchNotification: " + mdn,e); log.error("Problem processing MessageDispatchNotification: "+mdn,e);
} }
break; break;
} }
} }
} }
} }
synchronized public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable { synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
boolean wasFull = isFull(); boolean wasFull=isFull();
if( ack.isStandardAck() ) { if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0; int index=0;
boolean inAckRange=false; boolean inAckRange=false;
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { for(Iterator iter=dispatched.iterator();iter.hasNext();){
final MessageReference node = (MessageReference)iter.next(); final MessageReference node=(MessageReference) iter.next();
MessageId messageId = node.getMessageId(); MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { inAckRange=true;
inAckRange = true;
} }
if(inAckRange){
if( inAckRange ) {
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
if ( !context.isInTransaction() ) { if(!context.isInTransaction()){
iter.remove(); iter.remove();
} else { }else{
// setup a Synchronization to remove nodes from the dispatched list. // setup a Synchronization to remove nodes from the dispatched list.
context.getTransaction().addSynchronization(new Synchronization(){ context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Throwable { public void afterCommit() throws Throwable{
synchronized(PrefetchSubscription.this) { synchronized(PrefetchSubscription.this){
// Now that we are committed, we can remove the nodes. // Now that we are committed, we can remove the nodes.
boolean inAckRange=false; boolean inAckRange=false;
int index=0; int index=0;
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { for(Iterator iter=dispatched.iterator();iter.hasNext();){
final MessageReference node = (MessageReference)iter.next(); final MessageReference node=(MessageReference) iter.next();
MessageId messageId = node.getMessageId(); MessageId messageId=node.getMessageId();
if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
inAckRange = true; inAckRange=true;
} }
if( inAckRange ) { if(inAckRange){
index++; index++;
iter.remove(); iter.remove();
if( ack.getLastMessageId().equals(messageId)) { if(ack.getLastMessageId().equals(messageId)){
delivered = Math.max(0, delivered - (index+1)); delivered=Math.max(0,delivered-(index+1));
return; return;
} }
} }
} }
} }
} }
}); });
} }
index++; index++;
acknowledge(context, ack, node); acknowledge(context,ack,node);
if( ack.getLastMessageId().equals(messageId)) { if(ack.getLastMessageId().equals(messageId)){
if ( context.isInTransaction() ) if(context.isInTransaction())
delivered = Math.max(delivered,index+1); delivered=Math.max(delivered,index+1);
else else
delivered = Math.max(0, delivered - (index+1)); delivered=Math.max(0,delivered-(index+1));
if(wasFull&&!isFull()){
if( wasFull && !isFull() ) {
dispatchMatched(); dispatchMatched();
} }
return; return;
} else { }else{
// System.out.println("no match: "+ack.getLastMessageId()+","+messageId); // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
} }
} }
} }
log.info("Could not correlate acknowledgment with dispatched message: "+ack); log.info("Could not correlate acknowledgment with dispatched message: "+ack);
}else if(ack.isDeliveredAck()){
} else if( ack.isDeliveredAck() ) {
// Message was delivered but not acknowledged: update pre-fetch counters. // Message was delivered but not acknowledged: update pre-fetch counters.
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0; int index=0;
for (Iterator iter = dispatched.iterator(); iter.hasNext();index++) { for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
final MessageReference node = (MessageReference)iter.next(); final MessageReference node=(MessageReference) iter.next();
if( ack.getLastMessageId().equals(node.getMessageId()) ) { if(ack.getLastMessageId().equals(node.getMessageId())){
delivered = Math.max(delivered,index+1); delivered=Math.max(delivered,index+1);
if( wasFull && !isFull() ) { if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
} }
return; return;
} }
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}else if(ack.isPoisonAck()){
} else if( ack.isPoisonAck() ) {
// TODO: what if the message is already in a DLQ??? // TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
// Handle the poison ACK case: we need to send the message to a DLQ if(ack.isInTransaction())
if( ack.isInTransaction() )
throw new JMSException("Poison ack cannot be transacted: "+ack); throw new JMSException("Poison ack cannot be transacted: "+ack);
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0; int index=0;
boolean inAckRange=false; boolean inAckRange=false;
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { for(Iterator iter=dispatched.iterator();iter.hasNext();){
final MessageReference node = (MessageReference)iter.next(); final MessageReference node=(MessageReference) iter.next();
MessageId messageId = node.getMessageId(); MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
if( ack.getFirstMessageId()==null || ack.getFirstMessageId().equals(messageId)) { inAckRange=true;
inAckRange = true;
} }
if(inAckRange){
if( inAckRange ) {
// Send the message to the DLQ // Send the message to the DLQ
node.incrementReferenceCount(); node.incrementReferenceCount();
try { try{
Message message = node.getMessage(); Message message=node.getMessage();
if( message !=null ) { if(message!=null){
// The original destination and transaction id do not get filled when the message is first
// The original destination and transaction id do not get filled when the message is first sent, // sent,
// it is only populated if the message is routed to another destination like the DLQ // it is only populated if the message is routed to another destination like the DLQ
if( message.getOriginalDestination()!=null ) if(message.getOriginalDestination()!=null)
message.setOriginalDestination(message.getDestination()); message.setOriginalDestination(message.getDestination());
if( message.getOriginalTransactionId()!=null ) if(message.getOriginalTransactionId()!=null)
message.setOriginalTransactionId(message.getTransactionId()); message.setOriginalTransactionId(message.getTransactionId());
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy(); ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination()); .getDestination());
message.setDestination(deadLetterDestination); message.setDestination(deadLetterDestination);
message.setTransactionId(null); message.setTransactionId(null);
message.evictMarshlledForm(); message.evictMarshlledForm();
boolean originalFlowControl=context.isProducerFlowControl();
boolean originalFlowControl = context.isProducerFlowControl(); try{
try {
context.setProducerFlowControl(false); context.setProducerFlowControl(false);
context.getBroker().send(context, message); context.getBroker().send(context,message);
} finally { }finally{
context.setProducerFlowControl(originalFlowControl); context.setProducerFlowControl(originalFlowControl);
} }
}
} }finally{
} finally {
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
iter.remove(); iter.remove();
index++; index++;
acknowledge(context, ack, node); acknowledge(context,ack,node);
if( ack.getLastMessageId().equals(messageId)) { if(ack.getLastMessageId().equals(messageId)){
delivered=Math.max(0,delivered-(index+1));
delivered = Math.max(0, delivered - (index+1)); if(wasFull&&!isFull()){
if( wasFull && !isFull() ) {
dispatchMatched(); dispatchMatched();
} }
return; return;
@ -248,128 +214,115 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
} }
throw new JMSException("Invalid acknowledgment: "+ack); throw new JMSException("Invalid acknowledgment: "+ack);
} }
protected boolean isFull() { protected boolean isFull(){
return dispatched.size()-delivered >= info.getPrefetchSize() || preLoadSize > preLoadLimit; return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
} }
protected void dispatchMatched() throws IOException { protected void dispatchMatched() throws IOException{
if(!dispatching) { if(!dispatching){
dispatching = true; dispatching=true;
try { try{
for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) { for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
MessageReference node = (MessageReference) iter.next(); MessageReference node=(MessageReference) iter.next();
iter.remove(); iter.remove();
dispatch(node); dispatch(node);
} }
} finally { }finally{
dispatching=false; dispatching=false;
} }
} }
} }
private void dispatch(final MessageReference node) throws IOException { private void dispatch(final MessageReference node) throws IOException{
node.incrementReferenceCount(); node.incrementReferenceCount();
final Message message=node.getMessage();
final Message message = node.getMessage(); if(message==null){
if( message == null ) {
return; return;
} }
// Make sure we can dispatch a message. // Make sure we can dispatch a message.
if( canDispatch(node) && !isSlaveBroker()) { if(canDispatch(node)&&!isSlaveBroker()){
MessageDispatch md=createMessageDispatch(node,message);
MessageDispatch md = createMessageDispatch(node, message);
dispatched.addLast(node); dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
incrementPreloadSize(node.getMessage().getSize()); if(info.isDispatchAsync()){
if( info.isDispatchAsync() ) {
md.setConsumer(new Runnable(){ md.setConsumer(new Runnable(){
public void run() { public void run(){
// Since the message gets queued up in async dispatch, we don't want to // Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire. // decrease the reference count until it gets put on the wire.
onDispatch(node, message); onDispatch(node,message);
} }
}); });
context.getConnection().dispatchAsync(md); context.getConnection().dispatchAsync(md);
} else { }else{
context.getConnection().dispatchSync(md); context.getConnection().dispatchSync(md);
onDispatch(node, message); onDispatch(node,message);
} }
// The onDispatch() does the node.decrementReferenceCount(); // The onDispatch() does the node.decrementReferenceCount();
} else { }else{
// We were not allowed to dispatch that message (an other consumer grabbed it before we did) // We were not allowed to dispatch that message (an other consumer grabbed it before we did)
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} }
synchronized private void onDispatch(final MessageReference node, final Message message) { synchronized private void onDispatch(final MessageReference node,final Message message){
boolean wasFull=isFull();
boolean wasFull = isFull(); decrementPreloadSize(message.getSize());
decrementPreloadSize(message.getSize());
node.decrementReferenceCount(); node.decrementReferenceCount();
if(node.getRegionDestination()!=null){
if( node.getRegionDestination() !=null ) {
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message);
if(wasFull&&!isFull()){
if( wasFull && !isFull() ) { try{
try {
dispatchMatched(); dispatchMatched();
} catch (IOException e) { }catch(IOException e){
context.getConnection().serviceException(e); context.getConnection().serviceException(e);
} }
} }
} }
} }
private int incrementPreloadSize(int size) { private int incrementPreloadSize(int size){
preLoadSize += size; preLoadSize+=size;
return preLoadSize; return preLoadSize;
} }
private int decrementPreloadSize(int size) { private int decrementPreloadSize(int size){
preLoadSize -= size; preLoadSize-=size;
return preLoadSize; return preLoadSize;
} }
/** /**
* @param node * @param node
* @param message TODO * @param message
* TODO
* @return * @return
*/ */
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
MessageDispatch md = new MessageDispatch(); MessageDispatch md=new MessageDispatch();
md.setConsumerId( info.getConsumerId() ); md.setConsumerId(info.getConsumerId());
md.setDestination( node.getRegionDestination().getActiveMQDestination() ); md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message); md.setMessage(message);
md.setRedeliveryCounter( node.getRedeliveryCounter() ); md.setRedeliveryCounter(node.getRedeliveryCounter());
return md; return md;
} }
/** /**
* Use when a matched message is about to be dispatched to the client. * Use when a matched message is about to be dispatched to the client.
* *
* @param node * @param node
* @return false if the message should not be dispatched to the client (another sub may have already dispatched it for example). * @return false if the message should not be dispatched to the client (another sub may have already dispatched it
* for example).
*/ */
abstract protected boolean canDispatch(MessageReference node); abstract protected boolean canDispatch(MessageReference node);
/** /**
* Used during acknowledgment to remove the message. * Used during acknowledgment to remove the message.
* @throws IOException *
* @throws IOException
*/ */
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException { protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
} throws IOException{}
} }

View File

@ -1,97 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.net.URI;
import javax.jms.*;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
public class FTBrokerTest extends TestCase {
protected static final int MESSAGE_COUNT = 10;
protected BrokerService master;
protected BrokerService slave;
protected Connection connection;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
//protected String uriString = "tcp://localhost:62001";
protected void setUp() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
brokerFactory.afterPropertiesSet();
master = brokerFactory.getBroker();
brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
brokerFactory.afterPropertiesSet();
slave = brokerFactory.getBroker();
//uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")?randomize=false";
//uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")";
System.out.println("URI = " + uriString);
URI uri = new URI(uriString);
ConnectionFactory fac = new ActiveMQConnectionFactory(uri);
connection = fac.createConnection();
master.start();
slave.start();
//wait for thing to connect
Thread.sleep(1000);
connection.start();
super.setUp();
}
protected void tearDown() throws Exception {
try {
connection.close();
slave.stop();
master.stop();
}catch(Throwable e){
e.printStackTrace();
}
super.tearDown();
}
public void testFTBroker() throws Exception{
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getClass().toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++){
Message msg = session.createTextMessage("test: " + i);
producer.send(msg);
}
master.stop();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++){
System.out.println("GOT MSG: " + consumer.receive(1000));
}
}
}

View File

@ -0,0 +1,76 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
/**
*Test failover for Queues
*
*/
public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsTest{
protected BrokerService master;
protected BrokerService slave;
protected int inflightMessageCount = 0;
protected int failureCount = 50;
protected String uriString="failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
protected void setUp() throws Exception{
failureCount = super.messageCount/2;
super.topic = isTopic();
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
brokerFactory.afterPropertiesSet();
master=brokerFactory.getBroker();
brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
brokerFactory.afterPropertiesSet();
slave=brokerFactory.getBroker();
master.start();
slave.start();
// wait for thing to connect
Thread.sleep(1000);
super.setUp();
}
protected void tearDown() throws Exception{
super.tearDown();
slave.stop();
master.stop();
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
return new ActiveMQConnectionFactory(uriString);
}
protected void messageSent() throws Exception{
if (++inflightMessageCount >= failureCount){
inflightMessageCount = 0;
master.stop();
}
}
protected boolean isTopic(){
return false;
}
}