fix for AMQ-1849 with test, also master slave pendingdispatch messages were being ignored, test shows this

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692539 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-09-05 20:40:14 +00:00
parent dad506decb
commit b006b2533b
3 changed files with 79 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -176,7 +177,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
/**
* Removes a producer.
*
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
* @param info
* @throws Exception
*/
@ -190,7 +191,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
*
* @param context
* @param info
* @return the assocated subscription
* @return the associated subscription
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@ -211,6 +212,13 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
sendAsyncToSlave(info);
}
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
super.removeDestinationInfo(context, info);
if (info.getDestination().isTemporary()) {
sendAsyncToSlave(info);
}
}
/**
* begin a transaction
*

View File

@ -429,7 +429,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @return
*/
public boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
/**

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
String masterBindAddress = "tcp://localhost:61616";
String slaveBindAddress = "tcp://localhost:62616";
BrokerService slave;
/*
* add a slave broker
* @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker()
*/
@Override
protected BrokerService createBroker() throws Exception {
// bindAddress is used by super.createBroker
bindAddress = masterBindAddress;
BrokerService master = super.createBroker();
master.setBrokerName("master");
bindAddress = slaveBindAddress;
slave = super.createBroker();
slave.setBrokerName("slave");
slave.setMasterConnectorURI(masterBindAddress);
bindAddress = masterBindAddress;
return master;
}
@Override
protected void startBroker() throws Exception {
super.startBroker();
slave.start();
}
@Override
public void testLoadRequestReply() throws Exception {
super.testLoadRequestReply();
// some checks on the slave
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
AdvisoryBroker.class);
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
RegionBroker.class);
//serverDestination +
assertEquals(6, rb.getDestinationMap().size());
}
}