Initial rough cut of AMQP protocol support using the QPID proton project.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393500 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-03 14:15:01 +00:00
parent 62bcbaa5d0
commit cdd5150340
30 changed files with 2573 additions and 0 deletions

130
activemq-amqp/pom.xml Normal file
View File

@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.8-SNAPSHOT</version>
</parent>
<artifactId>activemq-amqp</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: AMQP</name>
<description>ActiveMQ implementaiton of AMQP messaging protocol</description>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-proton</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
<version>${hawtbuf-version}</version>
</dependency>
<!-- Testing Dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-console</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- profile which is activated is the swiftmq-client-home prop is defined.
Tt tests the amqp broker impl using the swiftmq client libs -->
<id>swiftmq-client</id>
<activation>
<property><name>swiftmq-client-home</name></property>
</activation>
<dependencies>
<dependency>
<groupId>com.swiftmq</groupId>
<artifactId>swiftmq</artifactId>
<version>9.2.0</version>
<scope>system</scope>
<systemPath>${swiftmq-client-home}/jars/swiftmq.jar</systemPath>
</dependency>
<dependency>
<groupId>com.swiftmq</groupId>
<artifactId>swiftmq-amqp</artifactId>
<version>9.2.0</version>
<scope>system</scope>
<systemPath>${swiftmq-client-home}/jars/amqp.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-test-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/test-swiftmq/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeInboundTransformer extends InboundTransformer {
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import java.util.HashMap;
import java.util.Map;
/**
* A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
*/
public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "amqp";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
//
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
//
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
public class AmqpNioSslTransport extends NIOSSLTransport {
public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
@Override
protected void initializeStreams() throws IOException {
super.initializeStreams();
if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
serviceRead();
}
}
@Override
protected void processCommand(ByteBuffer plain) throws Exception {
doConsume(AmqpSupport.toBuffer(plain));
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
SSLContext context;
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
if (context != null) {
transport.setSslContext(context);
}
return transport;
}
};
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new AmqpNioSslTransport(wf, socketFactory, location, localLocation);
}
@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
} catch (Exception e) {
throw new IOException(e);
}
}
return super.doBind(location);
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import javax.net.SocketFactory;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/
public class AmqpNioTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
}
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
channel.configureBlocking(false);
// listen for events telling us when the socket is readable.
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
public void onSelect(SelectorSelection selection) {
if (!isStopped()) {
serviceRead();
}
}
public void onError(SelectorSelection selection, Throwable error) {
if (error instanceof IOException) {
onException((IOException) error);
} else {
onException(IOExceptionSupport.create(error));
}
}
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
}
private void serviceRead() {
try {
while (isStarted()) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
selection.close();
break;
}
// nothing more to read, break
if (readSize == 0) {
break;
}
inputBuffer.flip();
doConsume(AmqpSupport.toBuffer(inputBuffer));
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
protected void doStart() throws Exception {
connect();
selection.setInterestOps(SelectionKey.OP_READ);
selection.enable();
}
protected void doStop(ServiceStopper stopper) throws Exception {
try {
if (selection != null) {
selection.close();
}
} finally {
super.doStop(stopper);
}
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* A <a href="http://amqp.org/">AMQP</a> over NIO transport factory
*/
public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "amqp";
}
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new AmqpNioTransport(format, socket);
}
};
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new AmqpNioTransport(wf, socketFactory, location, localLocation);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
}
}

View File

@ -0,0 +1,750 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.Inflater;
class AmqpProtocolConverter {
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
private final AmqpTransport amqpTransport;
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
this.amqpTransport = amqpTransport;
}
//
// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
//
//
// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
//
// private final ConcurrentHashMap<ConsumerId, AmqpSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSubscription>();
// private final ConcurrentHashMap<UTF8Buffer, AmqpSubscription> amqpSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, AmqpSubscription>();
// private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
// private final Map<Destination, UTF8Buffer> amqpTopicMap = new LRUCache<Destination, UTF8Buffer>();
// private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
// private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
//
// private final AtomicBoolean connected = new AtomicBoolean(false);
// private CONNECT connect;
// private String clientId;
// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
TransportImpl protonTransport = new TransportImpl();
ConnectionImpl protonConnection = new ConnectionImpl();
{
this.protonTransport.bind(this.protonConnection);
}
void pumpOut() {
try {
int size = 1024 * 64;
byte data[] = new byte[size];
boolean done = false;
while (!done) {
int count = protonTransport.output(data, 0, size);
if (count > 0) {
final Buffer buffer = new Buffer(data, 0, count);
System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer);
} else {
done = true;
}
}
// System.out.println("write done");
} catch (IOException e) {
amqpTransport.onException(e);
}
}
static class AmqpSessionContext {
private final SessionId sessionId;
long nextProducerId = 0;
long nextConsumerId = 0;
public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, -1);
}
}
/**
* Convert a AMQP command
*/
public void onAMQPData(Buffer frame) throws IOException, JMSException {
try {
System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
protonTransport.input(frame.data, frame.offset, frame.length);
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
}
try {
// Handle the amqp open..
if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
onConnectionOpen();
}
// Lets map amqp sessions to openwire sessions..
Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (session != null) {
onSessionOpen(session);
session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
}
Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (link != null) {
onLinkOpen(link);
link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
}
Delivery delivery = protonConnection.getWorkHead();
while (delivery != null) {
AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
if (listener != null) {
listener.onDelivery(delivery);
}
delivery = delivery.getWorkNext();
}
link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
while (link != null) {
if (link instanceof Receiver) {
// listener.onReceiverClose((Receiver) link);
} else {
// listener.onSenderClose((Sender) link);
}
link.close();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
// listener.onSessionClose(session);
session.close();
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
// listener.onConnectionClose(protonConnection);
protonConnection.close();
}
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
pumpOut();
}
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
rh.onResponse(this, response);
} else {
// Pass down any unexpected errors. Should this close the connection?
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
handleException(exception);
}
}
} else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
if (consumerContext != null) {
consumerContext.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError) command).getException();
handleException(exception);
} else if (command.isBrokerInfo()) {
//ignore
} else {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
}
}
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
}
private void onConnectionOpen() throws AmqpProtocolException {
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
// configureInactivityMonitor(connect.keepAlive());
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
} else {
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
}
// String userName = "";
// if (connect.userName() != null) {
// userName = connect.userName().toString();
// }
// String passswd = "";
// if (connect.password() != null) {
// passswd = connect.password().toString();
// }
// connectionInfo.setUserName(userName);
// connectionInfo.setPassword(passswd);
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open();
pumpOut();
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
protonConnection.close();
pumpOut();
amqpTransport.onException(IOExceptionSupport.create(exception));
return;
}
}
});
}
private void onSessionOpen(Session session) {
AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
session.setContext(sessionContext);
sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
session.open();
}
private void onLinkOpen(Link link) {
link.setLocalSourceAddress(link.getRemoteSourceAddress());
link.setLocalTargetAddress(link.getRemoteTargetAddress());
AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
if (link instanceof Receiver) {
onReceiverOpen((Receiver) link, sessionContext);
} else {
onSenderOpen((Sender) link, sessionContext);
}
}
class ProducerContext extends AmqpDeliveryListener {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
this.destination = destination;
}
@Override
public void onDelivery(Delivery delivery) throws JMSException {
// delivery.
ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
message.setProducerId(producerId);
message.onSend();
// sendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, null);
}
ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
ActiveMQBytesMessage msg = nextMessage(delivery);
final Receiver receiver = (Receiver) delivery.getLink();
byte buff[] = new byte[1024 * 4];
int count = 0;
while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
msg.writeBytes(buff, 0, count);
}
return msg;
}
ActiveMQBytesMessage current;
private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
if (current == null) {
current = new ActiveMQBytesMessage();
current.setJMSDestination(destination);
current.setProducerId(producerId);
current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
current.setTimestamp(System.currentTimeMillis());
current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
}
return current;
}
}
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, destination);
receiver.setContext(producerContext);
receiver.flow(1024 * 64);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(destination);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
receiver.open();
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
receiver.close();
}
pumpOut();
}
});
}
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
private final Sender sender;
long nextTagId = 0;
HashSet<byte[]> tagCache = new HashSet<byte[]>();
byte[] nextTag() {
byte[] rc;
if (tagCache != null && !tagCache.isEmpty()) {
final Iterator<byte[]> iterator = tagCache.iterator();
rc = iterator.next();
iterator.remove();
} else {
try {
rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
return rc;
}
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
this.sender = sender;
}
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
final byte[] tag = nextTag();
final Delivery delivery = sender.delivery(tag, 0, tag.length);
delivery.setContext(md);
// Covert to an AMQP messages.
org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
byte buffer[] = new byte[1024*4];
int c=0;
// And send the AMQP message over the link.
while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
sender.send(buffer, 0, c);
}
sender.advance();
}
public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
// result.setContentEncoding();
// QoS qoS;
// if (message.propertyExists(QOS_PROPERTY_NAME)) {
// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
// qoS = QoS.values()[ordinal];
//
// } else {
// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
// }
// result.qos(qoS);
Buffer content = null;
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
msg.setReadOnlyBody(true);
String messageText = msg.getText();
content = new Buffer(messageText.getBytes("UTF-8"));
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
msg.setReadOnlyBody(true);
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
content = new Buffer(data);
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
msg.setReadOnlyBody(true);
Map map = msg.getContentMap();
content = new Buffer(map.toString().getBytes("UTF-8"));
} else {
ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) {
if (message.isCompressed()) {
Inflater inflater = new Inflater();
inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
byte[] data = new byte[4096];
int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
while ((read = inflater.inflate(data)) != 0) {
bytesOut.write(data, 0, read);
}
byteSequence = bytesOut.toByteSequence();
}
content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
} else {
content = new Buffer(0);
}
}
org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
return result;
}
@Override
public void onDelivery(Delivery delivery) throws JMSException {
if( delivery.remotelySettled() ) {
MessageDispatch md = (MessageDispatch) delivery.getContext();
}
}
}
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
sender.setContext(consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
sender.open();
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
sender.close();
}
pumpOut();
}
});
}
//
// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
// if (destination == null) {
// throw new AmqpProtocolException("Invalid Destination.");
// }
//
// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
// ConsumerInfo consumerInfo = new ConsumerInfo(id);
// consumerInfo.setDestination(destination);
// consumerInfo.setPrefetchSize(1000);
// consumerInfo.setDispatchAsync(true);
// if (!connect.cleanSession() && (connect.clientId() != null)) {
// //by default subscribers are persistent
// consumerInfo.setSubscriptionName(connect.clientId().toString());
// }
//
// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
//
//
// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
//
// sendToActiveMQ(consumerInfo, null);
// return topic.qos();
// }
//
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {
// for (int i = 0; i < topics.length; i++) {
// onUnSubscribe(topics[i]);
// }
// }
// UNSUBACK ack = new UNSUBACK();
// ack.messageId(command.messageId());
// pumpOut(ack.encode());
//
// }
//
// void onUnSubscribe(UTF8Buffer topicName) {
// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
// if (subs != null) {
// ConsumerInfo info = subs.getConsumerInfo();
// if (info != null) {
// subscriptionsByConsumerId.remove(info.getConsumerId());
// }
// RemoveInfo removeInfo = info.createRemoveCommand();
// sendToActiveMQ(removeInfo, null);
// }
// }
//
//
// /**
// * Dispatch a ActiveMQ command
// */
//
//
//
// void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
// checkConnected();
// }
//
// void onAMQPPubAck(PUBACK command) {
// short messageId = command.messageId();
// MessageAck ack;
// synchronized (consumerAcks) {
// ack = consumerAcks.remove(messageId);
// }
// if (ack != null) {
// amqpTransport.sendToActiveMQ(ack);
// }
// }
//
// void onAMQPPubRec(PUBREC commnand) {
// //from a subscriber - send a PUBREL in response
// PUBREL pubrel = new PUBREL();
// pubrel.messageId(commnand.messageId());
// pumpOut(pubrel.encode());
// }
//
// void onAMQPPubRel(PUBREL command) {
// PUBREC ack;
// synchronized (publisherRecs) {
// ack = publisherRecs.remove(command.messageId());
// }
// if (ack == null) {
// LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
// }
// PUBCOMP pubcomp = new PUBCOMP();
// pubcomp.messageId(command.messageId());
// pumpOut(pubcomp.encode());
// }
//
// void onAMQPPubComp(PUBCOMP command) {
// short messageId = command.messageId();
// MessageAck ack;
// synchronized (consumerAcks) {
// ack = consumerAcks.remove(messageId);
// }
// if (ack != null) {
// amqpTransport.sendToActiveMQ(ack);
// }
// }
//
//
//
//
// public AmqpTransport amqpTransport {
// return amqpTransport;
// }
//
//
//
// void configureInactivityMonitor(short heartBeat) {
// try {
//
// int heartBeatMS = heartBeat * 1000;
// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
// monitor.setProtocolConverter(this);
// monitor.setReadCheckTime(heartBeatMS);
// monitor.setInitialDelayTime(heartBeatMS);
// monitor.startMonitorThread();
//
// } catch (Exception ex) {
// LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
// }
//
// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs");
// }
//
//
//
// void checkConnected() throws AmqpProtocolException {
// if (!connected.get()) {
// throw new AmqpProtocolException("Not connected.");
// }
// }
//
// private String getClientId() {
// if (clientId == null) {
// if (connect != null && connect.clientId() != null) {
// clientId = connect.clientId().toString();
// }
// } else {
// clientId = "";
// }
// return clientId;
// }
//
// private void stopTransport() {
// try {
// amqpTransport.stop();
// } catch (Throwable e) {
// LOG.debug("Failed to stop AMQP transport ", e);
// }
// }
//
// ResponseHandler createResponseHandler(final PUBLISH command) {
//
// if (command != null) {
// switch (command.qos()) {
// case AT_LEAST_ONCE:
// return new ResponseHandler() {
// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
// if (response.isException()) {
// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
// } else {
// PUBACK ack = new PUBACK();
// ack.messageId(command.messageId());
// converter.amqpTransport.sendToAmqp(ack.encode());
// }
// }
// };
// case EXACTLY_ONCE:
// return new ResponseHandler() {
// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
// if (response.isException()) {
// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
// } else {
// PUBREC ack = new PUBREC();
// ack.messageId(command.messageId());
// synchronized (publisherRecs) {
// publisherRecs.put(command.messageId(), ack);
// }
// converter.amqpTransport.sendToAmqp(ack.encode());
// }
// }
// };
// case AT_MOST_ONCE:
// break;
// }
// }
// return null;
// }
//
// private String convertAMQPToActiveMQ(String name) {
// String result = name.replace('#', '>');
// result = result.replace('+', '*');
// result = result.replace('/', '.');
// return result;
// }
////////////////////////////////////////////////////////////////////////////
//
// Implementation methods
//
////////////////////////////////////////////////////////////////////////////
private final Object commnadIdMutex = new Object();
private int lastCommandId;
int generateCommandId() {
synchronized (commnadIdMutex) {
return lastCommandId++;
}
}
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
amqpTransport.sendToActiveMQ(command);
}
void handleException(Throwable exception) {
exception.printStackTrace();
if (LOG.isDebugEnabled()) {
LOG.debug("Exception detail", exception);
}
try {
amqpTransport.stop();
} catch (Throwable e) {
LOG.error("Failed to stop AMQP Transport ", e);
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
public class AmqpProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;
private final boolean fatal;
public AmqpProtocolException() {
this(null);
}
public AmqpProtocolException(String s) {
this(s, false);
}
public AmqpProtocolException(String s, boolean fatal) {
this(s, fatal, null);
}
public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
super(s);
this.fatal = fatal;
initCause(cause);
}
public boolean isFatal() {
return fatal;
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.*;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.zip.DataFormatException;
/**
* Keeps track of the AMQP client subscription so that acking is correctly done.
*/
class AmqpSubscription {
// private final AmqpProtocolConverter protocolConverter;
//
// private final ConsumerInfo consumerInfo;
// private ActiveMQDestination destination;
// private final QoS qos;
//
// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
// this.protocolConverter = protocolConverter;
// this.consumerInfo = consumerInfo;
// this.qos = qos;
// }
//
// MessageAck createMessageAck(MessageDispatch md) {
// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
// }
//
// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
// PUBLISH publish = protocolConverter.convertMessage(message);
// if (publish.qos().ordinal() > this.qos.ordinal()) {
// publish.qos(this.qos);
// }
// return publish;
// }
//
// public boolean expectAck() {
// return qos != QoS.AT_MOST_ONCE;
// }
//
// public void setDestination(ActiveMQDestination destination) {
// this.destination = destination;
// }
//
// public ActiveMQDestination getDestination() {
// return destination;
// }
//
// public ConsumerInfo getConsumerInfo() {
// return consumerInfo;
// }
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.fusesource.hawtbuf.Buffer;
import java.nio.ByteBuffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AmqpSupport {
static public Buffer toBuffer(ByteBuffer data) {
if( data == null ) {
return null;
}
Buffer rc;
if( data.isDirect() ) {
rc = new Buffer(data.remaining());
data.get(rc.data);
} else {
rc = new Buffer(data);
data.position(data.position()+data.remaining());
}
return rc;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import org.fusesource.hawtbuf.Buffer;
import java.io.IOException;
import java.security.cert.X509Certificate;
/**
* Basic interface that mediates between protocol converter and transport
*/
public interface AmqpTransport {
public void sendToActiveMQ(Command command);
public void sendToAmqp(Buffer command) throws IOException;
public X509Certificate[] getPeerCertificates();
public void onException(IOException error);
// public AmqpInactivityMonitor getInactivityMonitor();
public AmqpWireFormat getWireFormat();
public void stop() throws Exception;
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import java.util.HashMap;
import java.util.Map;
/**
* A <a href="http://amqp.org/">AMQP</a> transport factory
*/
public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
protected String getDefaultWireFormatType() {
return "amqp";
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
// @Override
// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
//
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
//
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.io.IOException;
import java.security.cert.X509Certificate;
/**
* The AMQPTransportFilter normally sits on top of a TcpTransport that has been
* configured with the StompWireFormat and is used to convert AMQP commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the
* AMQPProtocolConverter
*/
public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
private final AmqpProtocolConverter protocolConverter;
// private AmqpInactivityMonitor monitor;
private AmqpWireFormat wireFormat;
private boolean trace;
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
if (wireFormat instanceof AmqpWireFormat) {
this.wireFormat = (AmqpWireFormat) wireFormat;
}
}
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
protocolConverter.onActiveMQCommand(command);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
public void onCommand(Object command) {
try {
if (trace) {
TRACE.trace("Received: \n" + command);
}
protocolConverter.onAMQPData((Buffer) command);
} catch (IOException e) {
handleException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
}
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
if (l != null) {
l.onCommand(command);
}
}
public void sendToAmqp(Buffer command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
}
Transport n = next;
if (n != null) {
n.oneway(command);
}
}
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
if (trace && peerCerts != null) {
LOG.debug("Peer Identity has been verified\n");
}
return peerCerts;
}
return null;
}
public boolean isTrace() {
return trace;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
// @Override
// public AmqpInactivityMonitor getInactivityMonitor() {
// return monitor;
// }
//
// public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
// this.monitor = monitor;
// }
@Override
public AmqpWireFormat getWireFormat() {
return this.wireFormat;
}
public void handleException(IOException e) {
super.onException(e);
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import java.io.*;
/**
*/
public class AmqpWireFormat implements WireFormat {
private int version = 1;
private long maxFrameLength = 1024*1024*100;
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return baos.toByteSequence();
}
public Object unmarshal(ByteSequence packet) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
public void marshal(Object command, DataOutput dataOut) throws IOException {
Buffer frame = (Buffer) command;
frame.writeTo(dataOut);
}
boolean magicRead = false;
public Object unmarshal(DataInput dataIn) throws IOException {
if( !magicRead ) {
Buffer magic = new Buffer(8);
magic.readFrom(dataIn);
magicRead = true;
return magic;
} else {
int size = dataIn.readInt();
if( size > maxFrameLength ) {
throw new AmqpProtocolException("Frame size exceeded max frame length.");
}
Buffer frame = new Buffer(size);
frame.bigEndianEditor().writeInt(size);
frame.readFrom(dataIn);
frame.clear();
return frame;
}
}
/**
*/
public void setVersion(int version) {
this.version = version;
}
/**
* @return the version of the wire format
*/
public int getVersion() {
return this.version;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class AmqpWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
return new AmqpWireFormat();
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class InboundTransformer {
String prefixVendor = "JMS_AMQP_";
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingInboundTransformer extends InboundTransformer {
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Response;
import java.io.IOException;
/**
* Interface used by the AMQPProtocolConverter for callbacks.
*/
interface ResponseHandler {
void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
A Broker side implementation of the AMQP 3.1 protocol - see http://amqp.org/
</body>
</html>

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpNioTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpNioSslTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpWireFormatFactory

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerService;
public class AmqpNioTest extends AmqpTest {
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerService;
import org.junit.Ignore;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
@Ignore("hangs atm, needs investigation")
public class AmqpSslTest extends AmqpTest {
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.startBroker();
}
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+ssl://localhost:8883");
}
// protected AMQP createAMQPConnection() throws Exception {
// AMQP amqp = new AMQP();
// amqp.setHost("ssl://localhost:8883");
// SSLContext ctx = SSLContext.getInstance("TLS");
// ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
// amqp.setSslContext(ctx);
// return amqp;
// }
static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}

View File

@ -0,0 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class AmqpTest {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected int port;
@Before
public void startBroker() throws Exception {
autoFailTestSupport.startAutoFailThread();
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
addAMQPConnector();
brokerService.start();
this.numberOfMessages = 2000;
}
protected void addAMQPConnector() throws Exception {
final TransportConnector connector = brokerService.addConnector("amqp://localhost:0");
port = connector.getConnectUri().getPort();
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService = null;
}
autoFailTestSupport.stopAutoFailThread();
}
// @Test
// public void testSendAndReceiveAMQP() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.start();
// AMQP amqp = new AMQP();
// final BlockingConnection subscribeConnection = amqp.blockingConnection();
// subscribeConnection.connect();
// Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
// Topic[] topics = {topic};
// subscribeConnection.subscribe(topics);
// final CountDownLatch latch = new CountDownLatch(numberOfMessages);
//
// Thread thread = new Thread(new Runnable() {
// public void run() {
// for (int i = 0; i < numberOfMessages; i++){
// try {
// Message message = subscribeConnection.receive();
// message.ack();
// latch.countDown();
// } catch (Exception e) {
// e.printStackTrace();
// break;
// }
//
// }
// }
// });
// thread.start();
//
// BlockingConnection publisherConnection = amqp.blockingConnection();
// publisherConnection.connect();
// for (int i = 0; i < numberOfMessages; i++){
// String payload = "Message " + i;
// publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
// }
//
// latch.await(10, TimeUnit.SECONDS);
// assertEquals(0, latch.getCount());
// }
//
// @Test
// public void testSendAndReceiveAtMostOnce() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.start();
// AMQP amqp = createAMQPConnection();
// amqp.setKeepAlive(Short.MAX_VALUE);
// BlockingConnection connection = amqp.blockingConnection();
//
// connection.connect();
//
//
// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
// connection.subscribe(topics);
// for (int i = 0; i < numberOfMessages; i++) {
// String payload = "Test Message: " + i;
// connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
// Message message = connection.receive();
// assertEquals(payload, new String(message.getPayload()));
// }
// connection.disconnect();
// }
//
// @Test
// public void testSendAndReceiveAtLeastOnce() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.start();
// AMQP amqp = createAMQPConnection();
// amqp.setKeepAlive(Short.MAX_VALUE);
// BlockingConnection connection = amqp.blockingConnection();
//
// connection.connect();
//
// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
// connection.subscribe(topics);
// for (int i = 0; i < numberOfMessages; i++) {
// String payload = "Test Message: " + i;
// connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
// Message message = connection.receive();
// message.ack();
// assertEquals(payload, new String(message.getPayload()));
// }
// connection.disconnect();
// }
//
// @Test
// public void testSendAndReceiveExactlyOnce() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.start();
// AMQP publisher = createAMQPConnection();
// BlockingConnection pubConnection = publisher.blockingConnection();
//
// pubConnection.connect();
//
// AMQP subscriber = createAMQPConnection();
// BlockingConnection subConnection = subscriber.blockingConnection();
//
// subConnection.connect();
//
// Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
// subConnection.subscribe(topics);
// for (int i = 0; i < numberOfMessages; i++) {
// String payload = "Test Message: " + i;
// pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
// Message message = subConnection.receive();
// message.ack();
// assertEquals(payload, new String(message.getPayload()));
// }
// subConnection.disconnect();
// pubConnection.disconnect();
// }
//
// @Test
// public void testSendAndReceiveLargeMessages() throws Exception {
// byte[] payload = new byte[1024 * 32];
// for (int i = 0; i < payload.length; i++){
// payload[i] = '2';
// }
// addAMQPConnector(brokerService);
// brokerService.start();
//
// AMQP publisher = createAMQPConnection();
// BlockingConnection pubConnection = publisher.blockingConnection();
//
// pubConnection.connect();
//
// AMQP subscriber = createAMQPConnection();
// BlockingConnection subConnection = subscriber.blockingConnection();
//
// subConnection.connect();
//
// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
// subConnection.subscribe(topics);
// for (int i = 0; i < 10; i++) {
// pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
// Message message = subConnection.receive();
// message.ack();
// assertArrayEquals(payload, message.getPayload());
// }
// subConnection.disconnect();
// pubConnection.disconnect();
// }
//
//
// @Test
// public void testSendAMQPReceiveJMS() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// brokerService.start();
// AMQP amqp = createAMQPConnection();
// BlockingConnection connection = amqp.blockingConnection();
// final String DESTINATION_NAME = "foo.*";
// connection.connect();
//
// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
// activeMQConnection.start();
// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
// MessageConsumer consumer = s.createConsumer(jmsTopic);
//
// for (int i = 0; i < numberOfMessages; i++) {
// String payload = "Test Message: " + i;
// connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
// ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
// ByteSequence bs = message.getContent();
// assertEquals(payload, new String(bs.data, bs.offset, bs.length));
// }
//
//
// activeMQConnection.close();
// connection.disconnect();
// }
//
// @Test
// public void testSendJMSReceiveAMQP() throws Exception {
// addAMQPConnector(brokerService);
// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// brokerService.start();
// AMQP amqp = createAMQPConnection();
// amqp.setKeepAlive(Short.MAX_VALUE);
// BlockingConnection connection = amqp.blockingConnection();
// connection.connect();
//
// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
// activeMQConnection.start();
// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Topic jmsTopic = s.createTopic("foo.far");
// MessageProducer producer = s.createProducer(jmsTopic);
//
// Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
// connection.subscribe(topics);
// for (int i = 0; i < numberOfMessages; i++) {
// String payload = "This is Test Message: " + i;
// TextMessage sendMessage = s.createTextMessage(payload);
// producer.send(sendMessage);
// Message message = connection.receive();
// message.ack();
// assertEquals(payload, new String(message.getPayload()));
// }
// connection.disconnect();
// }
//
//
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import com.swiftmq.amqp.AMQPContext;
import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue;
import com.swiftmq.amqp.v100.messaging.AMQPMessage;
import com.swiftmq.amqp.v100.types.AMQPString;
import com.swiftmq.amqp.v100.types.AMQPType;
import org.junit.Test;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class SwiftMQClientTest extends AmqpTest {
@Test
public void testSendReceive() throws Exception {
String queue = "testqueue";
int nMsgs = 1;
int qos = QoS.AT_MOST_ONCE;
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
try {
Connection connection = new Connection(ctx, "127.0.0.1", port, false);
connection.setContainerId("client");
connection.setIdleTimeout(-1);
connection.setMaxFrameSize(1024 * 4);
connection.setExceptionListener(new ExceptionListener() {
public void onException(Exception e) {
e.printStackTrace();
}
});
connection.connect();
{
String data = String.format("%010d", 0);
Session session = connection.createSession(10, 10);
Producer p = session.createProducer(queue, qos);
for (int i = 0; i < nMsgs; i++) {
AMQPMessage msg = new AMQPMessage();
String s = "Message #" + (i + 1);
System.out.println("Sending " + s);
msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data)));
p.send(msg);
}
p.close();
session.close();
}
// {
// Session session = connection.createSession(10, 10);
// Consumer c = session.createConsumer(queue, 100, qos, true, null);
//
// // Receive messages non-transacted
// for (int i = 0; i < nMsgs; i++) {
// AMQPMessage msg = c.receive();
// final AMQPType value = msg.getAmqpValue().getValue();
// if (value instanceof AMQPString) {
// AMQPString s = (AMQPString) value;
// System.out.println("Received: " + s.getValue());
// }
// if (!msg.isSettled())
// msg.accept();
// }
// c.close();
// session.close();
// }
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# The logging properties used during tests..
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.fusesource=INFO
# Console will only display warnnings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
log4j.appender.console.threshold=TRACE
# File appender will contain all info messages
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
log4j.appender.file.file=target/test.log
log4j.appender.file.append=true