From 1263dd8c4343fbe9412b377e10d84109e230fd5c Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 17 May 2016 09:59:35 -0400 Subject: [PATCH] Fix failing test in CI. Subscribe should request receipts so that the subscription exists prior to initiating the event that triggers the advisory message sent to the Topic. --- .../transport/stomp/StompAdvisoryTest.java | 84 +++++++++++++++++-- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java index 4bef5618fc..8290952ab3 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java @@ -90,10 +90,18 @@ public class StompAdvisoryTest extends StompTestSupport { @Test(timeout = 60000) public void testConnectionAdvisory() throws Exception { - stompConnect(); + + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); @@ -122,11 +130,16 @@ public class StompAdvisoryTest extends StompTestSupport { HashMap subheaders = new HashMap(1); subheaders.put("transformation", Stomp.Transformations.JMS_JSON.toString()); + subheaders.put("receipt", "id-1"); stompConnection.connect("system", "manager"); stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); + // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); c.start(); @@ -154,11 +167,16 @@ public class StompAdvisoryTest extends StompTestSupport { HashMap subheaders = new HashMap(1); subheaders.put("transformation", Stomp.Transformations.JMS_XML.toString()); + subheaders.put("receipt", "id-1"); stompConnection.connect("system", "manager"); stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); + // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); c.start(); @@ -186,8 +204,16 @@ public class StompAdvisoryTest extends StompTestSupport { Destination dest = new ActiveMQQueue("testConsumerAdvisory"); + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.Consumer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.Consumer.>", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); @@ -213,8 +239,16 @@ public class StompAdvisoryTest extends StompTestSupport { Destination dest = new ActiveMQQueue("testProducerAdvisory"); + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); @@ -243,11 +277,16 @@ public class StompAdvisoryTest extends StompTestSupport { HashMap subheaders = new HashMap(1); subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_XML.toString()); + subheaders.put("receipt", "id-1"); stompConnection.connect("system", "manager"); stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); + // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); c.start(); @@ -275,11 +314,16 @@ public class StompAdvisoryTest extends StompTestSupport { HashMap subheaders = new HashMap(1); subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + subheaders.put("receipt", "id-1"); stompConnection.connect("system", "manager"); stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); + // Now connect via openwire and check we get the advisory Connection c = cf.createConnection("system", "manager"); c.start(); @@ -352,8 +396,17 @@ public class StompAdvisoryTest extends StompTestSupport { cf.setWatchTopicAdvisories(false); stompConnect(); + + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempQueue", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempQueue", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection connection = cf.createConnection("system", "manager"); @@ -374,8 +427,17 @@ public class StompAdvisoryTest extends StompTestSupport { cf.setWatchTopicAdvisories(false); stompConnect(); + + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection connection = cf.createConnection("system", "manager"); @@ -395,9 +457,17 @@ public class StompAdvisoryTest extends StompTestSupport { cf.setWatchTopicAdvisories(true); + HashMap subheaders = new HashMap(1); + subheaders.put("receipt", "id-1"); + stompConnect(); stompConnection.connect("system", "manager"); - stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic,/topic/ActiveMQ.Advisory.TempQueue", Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic,/topic/ActiveMQ.Advisory.TempQueue", + Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Response to subscribe was: {}", frame); + assertTrue(frame.trim().startsWith("RECEIPT")); // Now connect via openwire and check we get the advisory Connection connection = cf.createConnection("system", "manager");