Fixin deadlock in amqp impl. Improved message mapping support and commented out the failing Joram test cases.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1394677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-05 17:35:07 +00:00
parent f355b16d38
commit 6baed7a15a
10 changed files with 172 additions and 63 deletions

View File

@ -52,11 +52,16 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
// strip off the mutex transport.
if( transport instanceof MutexTransport ) {
transport = ((MutexTransport)transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}

View File

@ -65,11 +65,16 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
// strip off the mutex transport.
if( transport instanceof MutexTransport ) {
transport = ((MutexTransport)transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}

View File

@ -19,22 +19,23 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.amqp.transform.*;
import org.apache.activemq.util.*;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.fusesource.hawtbuf.*;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.jms.JMSException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import java.util.concurrent.locks.ReentrantLock;
class AmqpProtocolConverter {
@ -50,6 +51,8 @@ class AmqpProtocolConverter {
this.amqpTransport = amqpTransport;
}
ReentrantLock lock = new ReentrantLock();
//
// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
//
@ -212,6 +215,8 @@ class AmqpProtocolConverter {
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
HashMap<Sender, ActiveMQDestination> tempDestinations = new HashMap<Sender, ActiveMQDestination>();
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
@ -219,6 +224,7 @@ class AmqpProtocolConverter {
}
private void onConnectionOpen() throws AmqpProtocolException {
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
// configureInactivityMonitor(connect.keepAlive());
@ -246,7 +252,6 @@ class AmqpProtocolConverter {
sendToActiveMQ(connectionInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open();
pumpProtonToSocket();
@ -289,7 +294,7 @@ class AmqpProtocolConverter {
}
}
InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ProducerContext extends AmqpDeliveryListener {
private final ProducerId producerId;
@ -355,13 +360,13 @@ class AmqpProtocolConverter {
// Client is producing to this receiver object
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, destination);
ActiveMQDestination dest = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
receiver.flow(1024 * 64);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(destination);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
receiver.open();
@ -485,11 +490,30 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
ActiveMQDestination dest;
if( sender.getRemoteSourceAddress() != null ) {
dest = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
} else {
// lets create a temp dest.
// if (topic) {
// dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
// } else {
dest = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
// }
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(dest);
sendToActiveMQ(info, null);
tempDestinations.put(sender, dest);
sender.setLocalSourceAddress(inboundTransformer.getVendor().toAddress(dest));
}
sender.setContext(consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);

View File

@ -55,11 +55,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
// strip off the mutex transport.
if( transport instanceof MutexTransport ) {
transport = ((MutexTransport)transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}

View File

@ -59,8 +59,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
synchronized (protocolConverter) {
protocolConverter.lock.lock();
try {
protocolConverter.onActiveMQCommand(command);
} finally {
protocolConverter.lock.unlock();
}
} catch (Exception e) {
throw IOExceptionSupport.create(e);
@ -72,8 +75,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
if (trace) {
TRACE.trace("Received: \n" + command);
}
synchronized (protocolConverter) {
protocolConverter.lock.lock();
try {
protocolConverter.onAMQPData((Buffer) command);
} finally {
protocolConverter.lock.unlock();
}
} catch (IOException e) {
handleException(e);
@ -83,6 +89,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
public void sendToActiveMQ(Command command) {
assert protocolConverter.lock.isHeldByCurrentThread();
TransportListener l = transportListener;
if (l != null) {
l.onCommand(command);
@ -90,6 +97,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
public void sendToAmqp(Buffer command) throws IOException {
assert protocolConverter.lock.isHeldByCurrentThread();
if (trace) {
TRACE.trace("Sending: \n" + command);
}

View File

@ -20,6 +20,7 @@ import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.messaging.*;
import javax.jms.*;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -62,12 +63,14 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
} else if (body instanceof AmqpSequence ) {
AmqpSequence sequence = (AmqpSequence) body;
StreamMessage m = vendor.createStreamMessage();
throw new RuntimeException("not implemented");
// jms = m;
for( Object item : sequence.getValue()) {
m.writeObject(item);
}
rc = m;
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if( value == null ) {
rc = vendor.createMessage();
rc = vendor.createObjectMessage();
} if( value instanceof String ) {
TextMessage m = vendor.createTextMessage();
m.setText((String) value);
@ -78,19 +81,22 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
} else if( value instanceof List) {
List d = (List) value;
StreamMessage m = vendor.createStreamMessage();
throw new RuntimeException("not implemented");
// jms = m;
for( Object item : (List) value) {
m.writeObject(item);
}
rc = m;
} else if( value instanceof Map) {
Map d = (Map) value;
MapMessage m = vendor.createMapMessage();
throw new RuntimeException("not implemented");
// jms = m;
final Set<Map.Entry<String, Object>> set = ((Map) value).entrySet();
for (Map.Entry<String, Object> entry : set) {
m.setObject(entry.getKey(), entry.getValue());
}
rc = m;
} else {
ObjectMessage m = vendor.createObjectMessage();
throw new RuntimeException("not implemented");
// jms = m;
m.setObject((Serializable) value);
rc = m;
}
} else {
throw new RuntimeException("Unexpected body type: "+body.getClass());

View File

@ -26,8 +26,10 @@ import org.apache.qpid.proton.type.messaging.*;
import javax.jms.*;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
@ -95,11 +97,25 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
} if( msg instanceof TextMessage ) {
body = new AmqpValue(((TextMessage) msg).getText());
} if( msg instanceof MapMessage ) {
throw new RuntimeException("Not implemented");
final HashMap map = new HashMap();
final MapMessage m = (MapMessage) msg;
final Enumeration names = m.getMapNames();
while (names.hasMoreElements()) {
String key = (String) names.nextElement();
map.put(key, m.getObject(key));
}
body = new AmqpValue(map);
} if( msg instanceof StreamMessage ) {
throw new RuntimeException("Not implemented");
ArrayList list = new ArrayList();
final StreamMessage m = (StreamMessage) msg;
try {
while(true) {
list.add(m.readObject());
}
} catch(MessageEOFException e){}
body = new AmqpSequence(list);
} if( msg instanceof ObjectMessage ) {
throw new RuntimeException("Not implemented");
body = new AmqpValue(((ObjectMessage) msg).getObject());
}
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);

View File

@ -34,7 +34,7 @@ public class JMSClientTest extends AmqpTestSupport {
QueueImpl queue = new QueueImpl("queue://testqueue");
int nMsgs = 100;
final String dataFormat = "%01024d";
final String dataFormat = "%010240d";
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);

View File

@ -20,7 +20,7 @@ import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.objectweb.jtests.jms.admin.Admin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
@ -31,6 +31,7 @@ import javax.naming.NamingException;
import java.io.File;
import java.net.URI;
import java.util.Hashtable;
import java.util.logging.*;
/**
*
@ -40,6 +41,7 @@ public class ActiveMQAdmin implements Admin {
Context context;
{
// enableJMSFrameTracing();
try {
// Use the jetty JNDI context since it's mutable.
final Hashtable<String, String> env = new Hashtable<String, String>();
@ -51,6 +53,29 @@ public class ActiveMQAdmin implements Admin {
}
}
private void enableJMSFrameTracing() {
final SimpleFormatter formatter = new SimpleFormatter();
Handler handler = new Handler() {
@Override
public void publish(LogRecord r) {
System.out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
}
@Override
public void flush() {
System.out.flush();
}
@Override
public void close() throws SecurityException {
}
};
Logger log = Logger.getLogger("RAW");
log.addHandler(handler);
log.setLevel(Level.FINEST);
}
protected BrokerService createBroker() throws Exception {
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
}
@ -59,22 +84,26 @@ public class ActiveMQAdmin implements Admin {
return getClass().getName();
}
BrokerService broker;
int port;
static BrokerService broker;
static int port;
public void startServer() throws Exception {
if( broker!=null ) {
stopServer();
}
if (System.getProperty("basedir") == null) {
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
broker = createBroker();
TransportConnector connector = broker.addConnector("amqp://localhost:0");
broker.start();
final TransportConnector connector = broker.addConnector("amqp://localhost:0");
port = connector.getConnectUri().getPort();
}
public void stopServer() throws Exception {
broker.stop();
broker = null;
}
public void start() throws Exception {

View File

@ -45,25 +45,38 @@ public class JoramJmsTest extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite();
// TODO: figure out why the following tests are failing..
// suite.addTestSuite(ConnectionTest.class);
// suite.addTestSuite(TopicConnectionTest.class);
// suite.addTestSuite(MessageHeaderTest.class);
// suite.addTestSuite(MessageBodyTest.class);
// suite.addTestSuite(MessageDefaultTest.class);
// suite.addTestSuite(MessageTypeTest.class);
// suite.addTestSuite(JMSXPropertyTest.class);
// suite.addTestSuite(MessagePropertyConversionTest.class);
// suite.addTestSuite(TemporaryQueueTest.class);
// suite.addTestSuite(SelectorSyntaxTest.class);
// suite.addTestSuite(QueueSessionTest.class);
// suite.addTestSuite(SessionTest.class);
// suite.addTestSuite(TopicSessionTest.class);
// suite.addTestSuite(TemporaryTopicTest.class);
// suite.addTestSuite(UnifiedSessionTest.class);
// suite.addTestSuite(QueueBrowserTest.class);
// suite.addTestSuite(MessagePropertyTest.class);
// suite.addTestSuite(SelectorTest.class);
// Passing tests
suite.addTestSuite(JMSXPropertyTest.class);
suite.addTestSuite(MessageBodyTest.class);
suite.addTestSuite(MessageDefaultTest.class);
suite.addTestSuite(MessagePropertyConversionTest.class);
suite.addTestSuite(MessagePropertyTest.class);
if (false ) {
// TODO: Fails due to JMS client impl error.
suite.addTestSuite(UnifiedSessionTest.class);
// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-62: ClassCastException when processing an Attach frame
suite.addTestSuite(QueueSessionTest.class);
suite.addTestSuite(SessionTest.class);
// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
suite.addTestSuite(MessageTypeTest.class);
// TODO: Fails due to temp destinations not being supported yet.
suite.addTestSuite(MessageHeaderTest.class);
suite.addTestSuite(TemporaryQueueTest.class);
suite.addTestSuite(TemporaryTopicTest.class);
// TODO: Fails due to selectors not being implemented yet.
suite.addTestSuite(SelectorSyntaxTest.class);
suite.addTestSuite(SelectorTest.class);
// TODO: Fails due to: javax.jms.IllegalStateException: Cannot set client-id to "publisherConnection"; client-id must be set on connection creation
suite.addTestSuite(TopicConnectionTest.class);
suite.addTestSuite(TopicSessionTest.class);
// TODO: figure out why the following tests fail..
// TODO: figure out why the following tests hang..
suite.addTestSuite(ConnectionTest.class);
suite.addTestSuite(QueueBrowserTest.class);
}
return suite;
}