Ensure that the sender is closed on error and add some tests for
unsubscribe failures.
This commit is contained in:
Timothy Bish 2014-10-17 14:58:56 -04:00
parent 226e012d88
commit 004568234b
2 changed files with 61 additions and 1 deletions

View File

@ -1277,8 +1277,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
}
}
sender.close();
} else {
sender.open();
}
pumpProtonToSocket();
}
});

View File

@ -49,6 +49,7 @@ import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@ -894,4 +895,61 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}));
}
@Test(timeout=30000)
public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final BrokerViewMBean broker = getProxyToBroker();
connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 0;
}
}));
try {
session.unsubscribe("DurbaleTopic");
fail("Should have thrown as subscription is in use.");
} catch (JMSException ex) {
}
}
@Ignore("Requires version 0.30 or higher to work.") // TODO
@Test(timeout=30000)
public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final BrokerViewMBean broker = getProxyToBroker();
connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
session.createDurableSubscriber(topic, "DurbaleTopic");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 1;
}
}));
try {
session.unsubscribe("DurbaleTopic");
fail("Should have thrown as subscription is in use.");
} catch (JMSException ex) {
}
}
}