diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 8dd29025b8..fc09b249cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -28,19 +28,8 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.*; +import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -402,6 +391,44 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override + public void networkBridgeStarted(BrokerInfo brokerInfo) { + try { + if (brokerInfo != null) { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setBooleanProperty("started", true); + + ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); + + ConnectionContext context = new ConnectionContext(); + context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + context.setBroker(getBrokerService().getBroker()); + fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + } + } catch (Exception e) { + LOG.warn("Failed to fire network bridge advisory"); + } + } + + @Override + public void networkBridgeStopped(BrokerInfo brokerInfo) { + try { + if (brokerInfo != null) { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setBooleanProperty("started", false); + + ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); + + ConnectionContext context = new ConnectionContext(); + context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + context.setBroker(getBrokerService().getBroker()); + fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + } + } catch (Exception e) { + LOG.warn("Failed to fire network bridge advisory"); + } + } + protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { fireAdvisory(context, topic, command, null); } diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 822fd1fe92..cc2f534da7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -47,6 +47,7 @@ public final class AdvisorySupport { public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed."; public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd."; public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; + public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge"; public static final String AGENT_TOPIC = "ActiveMQ.Agent"; public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId"; @@ -201,6 +202,10 @@ public final class AdvisorySupport { return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX); } + public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() { + return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX); + } + public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException { return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index b118c07b31..badac96219 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; +import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; @@ -379,5 +380,9 @@ public interface Broker extends Region, Service { ThreadPoolExecutor getExecutor(); + void networkBridgeStarted(BrokerInfo brokerInfo); + + void networkBridgeStopped(BrokerInfo brokerInfo); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 8149584704..c7fb20595e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -40,6 +40,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; +import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; @@ -310,4 +311,12 @@ public class BrokerFilter implements Broker { public ThreadPoolExecutor getExecutor() { return next.getExecutor(); } + + public void networkBridgeStarted(BrokerInfo brokerInfo) { + next.networkBridgeStarted(brokerInfo); + } + + public void networkBridgeStopped(BrokerInfo brokerInfo) { + next.networkBridgeStopped(brokerInfo); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 12834a85a9..26f3266969 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -282,6 +282,12 @@ public class EmptyBroker implements Broker { public void nowMasterBroker() { } + public void networkBridgeStarted(BrokerInfo brokerInfo) { + } + + public void networkBridgeStopped(BrokerInfo brokerInfo) { + } + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index dcface8bbf..8b066944ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -312,4 +312,12 @@ public class ErrorBroker implements Broker { public ThreadPoolExecutor getExecutor() { throw new BrokerStoppedException(this.message); } + + public void networkBridgeStarted(BrokerInfo brokerInfo) { + throw new BrokerStoppedException(this.message); + } + + public void networkBridgeStopped(BrokerInfo brokerInfo) { + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 8c120326de..115b6085e0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -322,4 +322,11 @@ public class MutableBrokerFilter implements Broker { return getNext().getExecutor(); } + public void networkBridgeStarted(BrokerInfo brokerInfo) { + getNext().networkBridgeStarted(brokerInfo); + } + + public void networkBridgeStopped(BrokerInfo brokerInfo) { + getNext().networkBridgeStopped(brokerInfo); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 40b83e62d4..d05c97cb50 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -309,6 +309,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); + brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo); LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); } else { @@ -419,6 +420,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ss.throwFirstException(); } } + brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); remoteBrokerNameKnownLatch.countDown(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java new file mode 100644 index 0000000000..1a1f25e302 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.BrokerInfo; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.net.URI; + +public class AdvisoryNetworkBridgeTest extends TestCase { + + BrokerService broker1; + BrokerService broker2; + + + public void testAdvisory() throws Exception { + broker1 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); + broker1.start(); + broker1.waitUntilStarted(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1"); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic()); + + Thread.sleep(1000); + + broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); + broker2.start(); + broker2.waitUntilStarted(); + + ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertTrue(advisory.getBooleanProperty("started")); + + broker2.stop(); + broker2.waitUntilStopped(); + + advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertFalse(advisory.getBooleanProperty("started")); + + } + + @Override + protected void tearDown() throws Exception { + broker1.stop(); + broker1.waitUntilStopped(); + + broker2.stop(); + broker2.waitUntilStopped(); + } +}