From ddfa96028c4c803467b0c074b8c1b4d3e5f9687d Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 25 Jul 2017 13:02:01 +0100 Subject: [PATCH] [AMQ-6778] fix and test - no longer gate firing advisory on broker start flag b/c consumers may be present once transport connectors have started --- .../activemq/advisory/AdvisoryBroker.java | 68 ++++++------ .../advisory/AdvisoryDuringStartTest.java | 105 ++++++++++++++++++ 2 files changed, 138 insertions(+), 35 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuringStartTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 4118dbe91d..1acd524664 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -832,43 +832,41 @@ public class AdvisoryBroker extends BrokerFilter { } public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { - if (getBrokerService().isStarted()) { - //set properties - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); - String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); + //set properties + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); + String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); - String url = getBrokerService().getVmConnectorURI().toString(); - //try and find the URL on the transport connector and use if it exists else - //try and find a default URL - if (context.getConnector() instanceof TransportConnector - && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) { - url = ((TransportConnector) context.getConnector()).getPublishableConnectString(); - } else if (getBrokerService().getDefaultSocketURIString() != null) { - url = getBrokerService().getDefaultSocketURIString(); - } - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); + String url = getBrokerService().getVmConnectorURI().toString(); + //try and find the URL on the transport connector and use if it exists else + //try and find a default URL + if (context.getConnector() instanceof TransportConnector + && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) { + url = ((TransportConnector) context.getConnector()).getPublishableConnectString(); + } else if (getBrokerService().getDefaultSocketURIString() != null) { + url = getBrokerService().getDefaultSocketURIString(); + } + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); - //set the data structure - advisoryMessage.setDataStructure(command); - advisoryMessage.setPersistent(false); - advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); - advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); - advisoryMessage.setTargetConsumerId(targetConsumerId); - advisoryMessage.setDestination(topic); - advisoryMessage.setResponseRequired(false); - advisoryMessage.setProducerId(advisoryProducerId); - boolean originalFlowControl = context.isProducerFlowControl(); - final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); - producerExchange.setConnectionContext(context); - producerExchange.setMutable(true); - producerExchange.setProducerState(new ProducerState(new ProducerInfo())); - try { - context.setProducerFlowControl(false); - next.send(producerExchange, advisoryMessage); - } finally { - context.setProducerFlowControl(originalFlowControl); - } + //set the data structure + advisoryMessage.setDataStructure(command); + advisoryMessage.setPersistent(false); + advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); + advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); + advisoryMessage.setTargetConsumerId(targetConsumerId); + advisoryMessage.setDestination(topic); + advisoryMessage.setResponseRequired(false); + advisoryMessage.setProducerId(advisoryProducerId); + boolean originalFlowControl = context.isProducerFlowControl(); + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + try { + context.setProducerFlowControl(false); + next.send(producerExchange, advisoryMessage); + } finally { + context.setProducerFlowControl(originalFlowControl); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuringStartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuringStartTest.java new file mode 100644 index 0000000000..503a9b8328 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuringStartTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.util.ServiceStopper; +import org.junit.After; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class AdvisoryDuringStartTest { + + BrokerService brokerService; + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testConsumerAdvisoryDuringSlowStart() throws Exception { + + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.addConnector("tcp://localhost:0"); + + final CountDownLatch resumeStart = new CountDownLatch(1); + brokerService.addNetworkConnector(new DiscoveryNetworkConnector() { + @Override + protected void handleStart() throws Exception { + // delay broker started flag + resumeStart.await(5, TimeUnit.SECONDS); + } + + @Override + protected void handleStop(ServiceStopper s) throws Exception {} + }); + Executors.newCachedThreadPool().submit(new Runnable() { + @Override + public void run() { + try { + brokerService.start(); + } catch (Exception e) { + e.printStackTrace(); + fail("error on start: " + e.toString()); + } + } + }); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")"); + Connection advisoryConnection = connectionFactory.createConnection(); + advisoryConnection.start(); + Session advisorySession = advisoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = advisorySession.createConsumer(advisorySession.createTopic("ActiveMQ.Advisory.Consumer.>")); + + Connection consumerConnection = connectionFactory.createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerConnection.start(); + ActiveMQTopic dest = new ActiveMQTopic("SomeTopic"); + + // real consumer + consumerSession.createConsumer(dest); + + resumeStart.countDown(); + + + ActiveMQMessage advisory = (ActiveMQMessage)advisoryConsumer.receive(4000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof ConsumerInfo); + assertTrue(((ConsumerInfo)advisory.getDataStructure()).getDestination().equals(dest)); + advisoryConnection.close(); + + consumerConnection.close(); + } + +}