mirror of https://github.com/apache/activemq.git
Need to make sure that the durableDestinations set on the duplex bridge are only topics
This commit is contained in:
parent
e05db7cb5c
commit
9f7d70ba0d
|
@ -81,6 +81,7 @@ import org.apache.activemq.network.DemandForwardingBridge;
|
|||
import org.apache.activemq.network.MBeanNetworkListener;
|
||||
import org.apache.activemq.network.NetworkBridgeConfiguration;
|
||||
import org.apache.activemq.network.NetworkBridgeFactory;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.security.MessageAuthorizationPolicy;
|
||||
import org.apache.activemq.state.CommandVisitor;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
|
@ -1420,11 +1421,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
listener.setCreatedByDuplex(true);
|
||||
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
|
||||
duplexBridge.setBrokerService(brokerService);
|
||||
Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations();
|
||||
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
|
||||
if (durableDestinations != null) {
|
||||
duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0]));
|
||||
}
|
||||
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
|
||||
broker.getDurableDestinations()));
|
||||
|
||||
// now turn duplex off this side
|
||||
info.setDuplexConnection(false);
|
||||
duplexBridge.setCreatedByDuplex(true);
|
||||
|
|
|
@ -135,6 +135,15 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
destsList = getStaticallyIncludedDestinations();
|
||||
dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
|
||||
result.setStaticallyIncludedDestinations(dests);
|
||||
result.setDurableDestinations(getDurableTopicDestinations(durableDestinations));
|
||||
return result;
|
||||
}
|
||||
|
||||
protected Transport createLocalTransport() throws Exception {
|
||||
return TransportFactory.connect(localURI);
|
||||
}
|
||||
|
||||
public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) {
|
||||
if (durableDestinations != null) {
|
||||
|
||||
HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
|
||||
|
@ -146,13 +155,9 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
|
||||
ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
|
||||
dest = topics.toArray(dest);
|
||||
result.setDurableDestinations(dest);
|
||||
return dest;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
protected Transport createLocalTransport() throws Exception {
|
||||
return TransportFactory.connect(localURI);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue