fix some issues with assembly tests

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-19 22:14:33 +00:00
parent 97fff421f3
commit b593aca5f8
4 changed files with 69 additions and 33 deletions

View File

@ -45,6 +45,14 @@ public class ConduitBridge extends DemandForwardingBridge{
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
if (addToAlreadyInterestedConsumers(info)){
return null; //don't want this subscription added
}
return doCreateDemandSubscription(info);
}
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){
//search through existing subscriptions and see if we have a match
boolean matched = false;
DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
@ -57,18 +65,7 @@ public class ConduitBridge extends DemandForwardingBridge{
//continue - we want interest to any existing DemandSubscriptions
}
}
if (matched){
return null; //don't want this subscription added
}
//not matched so create a new one
//but first, if it's durable - changed set the
//ConsumerId here - so it won't be removed if the
//durable subscriber goes away on the other end
if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())){
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
}
return super.createDemandSubscription(info);
return matched;
}
protected void removeDemandSubscription(ConsumerId id) throws IOException{

View File

@ -40,7 +40,6 @@ public class DemandSubscription{
remoteInfo=info;
localInfo=info.copy();
localInfo.setBrokerPath(info.getBrokerPath());
localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId());
}

View File

@ -1,28 +1,27 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Consolidates subscriptions
*
@ -30,29 +29,37 @@ import org.apache.commons.logging.LogFactory;
*/
public class DurableConduitBridge extends ConduitBridge{
static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
/**
* Constructor
*
* @param localBroker
* @param remoteBroker
*/
public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
}
/**
* Subscriptions for these desitnations are always created
* @throws IOException
*
*
*/
protected void setupStaticDestinations() throws IOException{
protected void setupStaticDestinations(){
super.setupStaticDestinations();
ActiveMQDestination[] dests=durableDestinations;
if(dests!=null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
if(isPermissableDestination(dest)){
if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
DemandSubscription sub=createDemandSubscription(dest);
addSubscription(sub);
if(dest.isTopic()){
sub.getLocalInfo().setSubcriptionName(getLocalBrokerName());
}
try{
addSubscription(sub);
}catch(IOException e){
log.error("Failed to add static destination "+dest,e);
}
if(log.isTraceEnabled())
log.trace("Forwarding messages for durable destination: "+dest);
}
@ -60,4 +67,32 @@ public class DurableConduitBridge extends ConduitBridge{
}
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
if(addToAlreadyInterestedConsumers(info)){
return null; // don't want this subscription added
}
// not matched so create a new one
// but first, if it's durable - changed set the
// ConsumerId here - so it won't be removed if the
// durable subscriber goes away on the other end
if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
}
if(info.isDurable()){
// set the subscriber name to something reproducable
info.setSubcriptionName(getLocalBrokerName());
}
return doCreateDemandSubscription(info);
}
protected boolean doesConsumerExist(ActiveMQDestination dest){
DestinationFilter filter=DestinationFilter.parseFilter(dest);
for(Iterator i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
DemandSubscription ds=(DemandSubscription) i.next();
if(filter.matches(ds.getLocalInfo().getDestination())){
return true;
}
}
return false;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.Service;
@ -82,6 +83,10 @@ public class NetworkConnector implements Service, DiscoveryListener {
public void stop() throws Exception {
this.discoveryAgent.stop();
for (Iterator i = bridges.values().iterator();i.hasNext();){
Bridge bridge = (Bridge)i.next();
bridge.stop();
}
}
public void onServiceAdd(DiscoveryEvent event) {