Improved AMQP protocol support. All JORAM Selector tests are now working.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1404158 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-31 14:17:32 +00:00
parent cb3392c307
commit 9a6a83fe0d
3 changed files with 21 additions and 9 deletions

View File

@ -41,6 +41,7 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.InvalidSelectorException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
@ -626,11 +627,13 @@ class AmqpProtocolConverter {
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
receiver.open();
if (response.isException()) {
// If the connection attempt fails we close the socket.
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.close();
} else {
receiver.open();
}
pumpProtonToSocket();
}
@ -899,11 +902,17 @@ class AmqpProtocolConverter {
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
sender.open();
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
String name = exception.getClass().getName();
if( exception instanceof InvalidSelectorException ) {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
sender.close();
} else {
sender.open();
}
pumpProtonToSocket();
}

View File

@ -138,9 +138,13 @@ public abstract class InboundTransformer {
if( ma!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
String key = entry.getKey().toString();
if( "x-opt-jms-type".equals(key) ) {
jms.setJMSType(entry.getValue().toString());
} else {
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
}
}
}
final Properties properties = amqp.getProperties();
if( properties!=null ) {

View File

@ -47,6 +47,9 @@ public class JoramJmsTest extends TestCase {
TestSuite suite = new TestSuite();
// Passing tests
suite.addTestSuite(SelectorSyntaxTest.class);
suite.addTestSuite(QueueSessionTest.class);
suite.addTestSuite(SelectorTest.class);
suite.addTestSuite(TemporaryQueueTest.class);
suite.addTestSuite(ConnectionTest.class);
suite.addTestSuite(SessionTest.class);
@ -58,10 +61,6 @@ public class JoramJmsTest extends TestCase {
if (false ) {
// TODO: Fails due to selectors not being implemented yet.
suite.addTestSuite(SelectorSyntaxTest.class);
suite.addTestSuite(SelectorTest.class);
suite.addTestSuite(QueueSessionTest.class);
// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues
suite.addTestSuite(MessageHeaderTest.class);
// TODO: Fails due to JMS client setup browser before getEnumeration() gets called.