mirror of https://github.com/apache/activemq.git
apply patch for: https://issues.apache.org/jira/browse/AMQ-4696
This commit is contained in:
parent
a63a8320ea
commit
d2f4d76f83
|
@ -60,13 +60,13 @@ import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.DescribedType;
|
import org.apache.qpid.proton.amqp.DescribedType;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
|
||||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Released;
|
import org.apache.qpid.proton.amqp.messaging.Released;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Target;
|
import org.apache.qpid.proton.amqp.messaging.Target;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Declare;
|
import org.apache.qpid.proton.amqp.transaction.Declare;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Declared;
|
import org.apache.qpid.proton.amqp.transaction.Declared;
|
||||||
|
@ -115,7 +115,6 @@ class AmqpProtocolConverter {
|
||||||
private static final Symbol COPY = Symbol.getSymbol("copy");
|
private static final Symbol COPY = Symbol.getSymbol("copy");
|
||||||
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
|
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
|
||||||
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
|
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
|
||||||
private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
|
|
||||||
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
|
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
|
||||||
|
|
||||||
int prefetch = 100;
|
int prefetch = 100;
|
||||||
|
@ -1168,7 +1167,7 @@ class AmqpProtocolConverter {
|
||||||
if (source.getDistributionMode() == COPY && dest.isQueue()) {
|
if (source.getDistributionMode() == COPY && dest.isQueue()) {
|
||||||
consumerInfo.setBrowser(true);
|
consumerInfo.setBrowser(true);
|
||||||
}
|
}
|
||||||
if (DURABLE.equals(source.getDurable()) && dest.isTopic()) {
|
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) && dest.isTopic()) {
|
||||||
consumerInfo.setSubscriptionName(sender.getName());
|
consumerInfo.setSubscriptionName(sender.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import javax.jms.TopicConnection;
|
||||||
|
import javax.jms.TopicConnectionFactory;
|
||||||
|
import javax.jms.TopicSession;
|
||||||
|
import javax.jms.TopicSubscriber;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerView;
|
||||||
|
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||||
|
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AMQ4696Test extends AmqpTestSupport {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void simpleDurableTopicTest() throws Exception {
|
||||||
|
String TOPIC_NAME = "topic://AMQ4696Test" + System.currentTimeMillis();
|
||||||
|
String durableClientId = "AMQPDurableTopicTestClient";
|
||||||
|
String durableSubscriberName = "durableSubscriberName";
|
||||||
|
|
||||||
|
BrokerView adminView = this.brokerService.getAdminView();
|
||||||
|
int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
|
||||||
|
int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
|
||||||
|
LOG.debug(">>>> At Start, durable Subscribers " + durableSubscribersAtStart + " inactiveDurableSubscribers " + inactiveSubscribersAtStart);
|
||||||
|
|
||||||
|
TopicConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
|
||||||
|
Topic topic = new TopicImpl("topic://" + TOPIC_NAME);
|
||||||
|
TopicConnection subscriberConnection = factory.createTopicConnection();
|
||||||
|
subscriberConnection.setClientID(durableClientId);
|
||||||
|
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
|
||||||
|
|
||||||
|
assertNotNull(messageConsumer);
|
||||||
|
|
||||||
|
int durableSubscribers = adminView.getDurableTopicSubscribers().length;
|
||||||
|
int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
|
||||||
|
LOG.debug(">>>> durable Subscribers after creation " + durableSubscribers + " inactiveDurableSubscribers " + inactiveSubscribers);
|
||||||
|
assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
|
||||||
|
assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
|
||||||
|
|
||||||
|
subscriberConnection.close();
|
||||||
|
subscriberConnection = null;
|
||||||
|
|
||||||
|
durableSubscribers = adminView.getDurableTopicSubscribers().length;
|
||||||
|
inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
|
||||||
|
LOG.debug(">>>> durable Subscribers after close " + durableSubscribers + " inactiveDurableSubscribers " + inactiveSubscribers);
|
||||||
|
assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
|
||||||
|
assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue