Clean up a bit, remove commented out code from other transports.

This commit is contained in:
Timothy Bish 2014-05-28 15:27:34 -04:00
parent b5c6c1eaeb
commit f2653e6936
17 changed files with 189 additions and 201 deletions

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.activemq.command.Command;
/**
* Used to assign the best implementation of a AmqpProtocolConverter to the
* AmqpTransport based on the AmqpHeader that the client sends us.
@ -31,12 +31,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
interface Discriminator {
boolean matches(AmqpHeader header);
IAmqpProtocolConverter create(AmqpTransport transport);
}
static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
static {
DISCRIMINATORS.add(new Discriminator(){
DISCRIMINATORS.add(new Discriminator() {
@Override
public IAmqpProtocolConverter create(AmqpTransport transport) {
@ -45,11 +46,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
@Override
public boolean matches(AmqpHeader header) {
switch( header.getProtocolId() ) {
switch (header.getProtocolId()) {
case 0:
case 3:
if( header.getMajor() == 1 && header.getMinor()==0 && header.getRevision()==0 )
if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
return true;
}
}
return false;
}
@ -70,12 +72,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
Discriminator match = null;
for (Discriminator discriminator : DISCRIMINATORS) {
if( discriminator.matches(header) ) {
if (discriminator.matches(header)) {
match = discriminator;
}
}
// Lets use first in the list if none are a good match.
if( match == null ) {
if (match == null) {
match = DISCRIMINATORS.get(0);
}
IAmqpProtocolConverter next = match.create(transport);

View File

@ -58,11 +58,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
transport = ((MutexTransport) transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}
@ -71,17 +66,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
this.brokerContext = brokerService.getBrokerContext();
}
// protected Transport createInactivityMonitor(Transport transport,
// WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport,
// format);
//
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
//
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;

View File

@ -42,13 +42,13 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.qpid.proton.jms.JMSVendor;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class ActiveMQJMSVendor extends JMSVendor {
final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
private ActiveMQJMSVendor() {}
private ActiveMQJMSVendor() {
}
@Override
public BytesMessage createBytesMessage() {
@ -87,16 +87,16 @@ public class ActiveMQJMSVendor extends JMSVendor {
@Override
public <T extends Destination> T createDestination(String name, Class<T> kind) {
if( kind == Queue.class ) {
if (kind == Queue.class) {
return kind.cast(new ActiveMQQueue(name));
}
if( kind == Topic.class ) {
if (kind == Topic.class) {
return kind.cast(new ActiveMQTopic(name));
}
if( kind == TemporaryQueue.class ) {
if (kind == TemporaryQueue.class) {
return kind.cast(new ActiveMQTempQueue(name));
}
if( kind == TemporaryTopic.class ) {
if (kind == TemporaryTopic.class) {
return kind.cast(new ActiveMQTempTopic(name));
}
return kind.cast(ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE));
@ -104,26 +104,26 @@ public class ActiveMQJMSVendor extends JMSVendor {
@Override
public void setJMSXUserID(Message msg, String value) {
((ActiveMQMessage)msg).setUserID(value);
((ActiveMQMessage) msg).setUserID(value);
}
@Override
public void setJMSXGroupID(Message msg, String value) {
((ActiveMQMessage)msg).setGroupID(value);
((ActiveMQMessage) msg).setGroupID(value);
}
@Override
public void setJMSXGroupSequence(Message msg, int value) {
((ActiveMQMessage)msg).setGroupSequence(value);
((ActiveMQMessage) msg).setGroupSequence(value);
}
@Override
public void setJMSXDeliveryCount(Message msg, long value) {
((ActiveMQMessage)msg).setRedeliveryCounter((int) value);
((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
}
@Override
public String toAddress(Destination dest) {
return ((ActiveMQDestination)dest).getQualifiedName();
return ((ActiveMQDestination) dest).getQualifiedName();
}
}

View File

@ -19,65 +19,64 @@ package org.apache.activemq.transport.amqp;
import org.fusesource.hawtbuf.Buffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AmqpHeader {
static final Buffer PREFIX = new Buffer(new byte[]{
'A', 'M', 'Q', 'P'
});
static final Buffer PREFIX = new Buffer(new byte[] { 'A', 'M', 'Q', 'P' });
private Buffer buffer;
public AmqpHeader(){
this(new Buffer(new byte[]{
'A', 'M', 'Q', 'P', 0, 1, 0, 0
}));
public AmqpHeader() {
this(new Buffer(new byte[] { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }));
}
public AmqpHeader(Buffer buffer){
public AmqpHeader(Buffer buffer) {
setBuffer(buffer);
}
public int getProtocolId() {
return buffer.get(4) & 0xFF;
}
public void setProtocolId(int value) {
buffer.data[buffer.offset+4] = (byte) value;
buffer.data[buffer.offset + 4] = (byte) value;
}
public int getMajor() {
return buffer.get(5) & 0xFF;
}
public void setMajor(int value) {
buffer.data[buffer.offset+5] = (byte) value;
buffer.data[buffer.offset + 5] = (byte) value;
}
public int getMinor() {
return buffer.get(6) & 0xFF;
}
public void setMinor(int value) {
buffer.data[buffer.offset+6] = (byte) value;
buffer.data[buffer.offset + 6] = (byte) value;
}
public int getRevision() {
return buffer.get(7) & 0xFF;
}
public void setRevision(int value) {
buffer.data[buffer.offset+7] = (byte) value;
buffer.data[buffer.offset + 7] = (byte) value;
}
public Buffer getBuffer() {
return buffer;
}
public void setBuffer(Buffer value) {
if( !value.startsWith(PREFIX) || value.length()!=8 ) {
if (!value.startsWith(PREFIX) || value.length() != 8) {
throw new IllegalArgumentException("Not an AMQP header buffer");
}
buffer = value.buffer();
}
@Override
public String toString() {
return buffer.toString();

View File

@ -16,18 +16,20 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
public class AmqpNioSslTransport extends NIOSSLTransport {
private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);

View File

@ -35,11 +35,12 @@ import org.apache.activemq.wireformat.WireFormat;
public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
SSLContext context;
protected SSLContext context;
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
if (context != null) {
@ -71,5 +72,4 @@ public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
}
return super.doBind(location);
}
}

View File

@ -16,17 +16,6 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@ -37,14 +26,28 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/
public class AmqpNioTransport extends TcpTransport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
private SocketChannel channel;
private SelectorSelection selection;
private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
private ByteBuffer inputBuffer;
@ -71,6 +74,7 @@ public class AmqpNioTransport extends TcpTransport {
@Override
public void onError(SelectorSelection selection, Throwable error) {
LOG.trace("Error detected: {}", error.getMessage());
if (error instanceof IOException) {
onException((IOException) error);
} else {

View File

@ -16,6 +16,17 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -27,16 +38,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* A <a href="http://amqp.org/">AMQP</a> over NIO transport factory
*/
@ -44,18 +45,22 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
private BrokerContext brokerContext = null;
@Override
protected String getDefaultWireFormatType() {
return "amqp";
}
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new AmqpNioTransport(format, socket);
}
};
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new AmqpNioTransport(wf, socketFactory, location, localLocation);
}
@ -70,14 +75,10 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
transport = ((MutexTransport)transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
@ -85,17 +86,11 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;

View File

@ -16,24 +16,25 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.TransportSupport;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.activemq.transport.TransportSupport;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpNioTransportHelper {
private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[] { 'A', 'M', 'Q', 'P' }));
private final Integer AMQP_HEADER_VALUE;
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class);
protected int nextFrameSize = -1;
protected ByteBuffer currentBuffer;
private boolean magicConsumed = false;
private TransportSupport transportSupport;
private final TransportSupport transportSupport;
public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException {
AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
@ -41,10 +42,11 @@ public class AmqpNioTransportHelper {
}
protected void processCommand(ByteBuffer plain) throws Exception {
// Are we waiting for the next Command or building on the current one? The frame size is in the first 4 bytes.
// Are we waiting for the next Command or building on the current one?
// The frame size is in the first 4 bytes.
if (nextFrameSize == -1) {
// We can get small packets that don't give us enough for the frame size
// so allocate enough for the initial size value and
// We can get small packets that don't give us enough for the frame
// size so allocate enough for the initial size value and
if (plain.remaining() < 4) {
if (currentBuffer == null) {
currentBuffer = ByteBuffer.allocate(4);
@ -63,10 +65,11 @@ public class AmqpNioTransportHelper {
nextFrameSize = currentBuffer.getInt();
}
} else {
// Either we are completing a previous read of the next frame size or its
// fully contained in plain already.
// Either we are completing a previous read of the next frame
// size or its fully contained in plain already.
if (currentBuffer != null) {
// Finish the frame size integer read and get from the current buffer.
// Finish the frame size integer read and get from the
// current buffer.
while (currentBuffer.hasRemaining()) {
currentBuffer.put(plain.get());
}
@ -79,8 +82,8 @@ public class AmqpNioTransportHelper {
}
}
// There are three possibilities when we get here. We could have a partial frame,
// a full frame, or more than 1 frame
// There are three possibilities when we get here. We could have a
// partial frame, a full frame, or more than 1 frame
while (true) {
// handle headers, which start with 'A','M','Q','P' rather than size
if (nextFrameSize == AMQP_HEADER_VALUE) {
@ -91,8 +94,10 @@ public class AmqpNioTransportHelper {
}
validateFrameSize(nextFrameSize);
// now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called TODO update
// because we need to put back the 4 bytes we read to determine the size)
// now we have the data, let's reallocate and try to fill it,
// (currentBuffer.putInt() is called TODO update
// because we need to put back the 4 bytes we read to determine the
// size)
if (currentBuffer == null || (currentBuffer.limit() == 4)) {
currentBuffer = ByteBuffer.allocate(nextFrameSize);
currentBuffer.putInt(nextFrameSize);
@ -106,8 +111,9 @@ public class AmqpNioTransportHelper {
currentBuffer.put(fill);
}
// Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true,
// we have not filled the buffer yet, i.e. we haven't received the full frame.
// Either we have enough data for a new command or we have to wait for some more.
// If hasRemaining is true, we have not filled the buffer yet, i.e. we haven't
// received the full frame.
if (currentBuffer.hasRemaining()) {
return;
} else {
@ -137,8 +143,7 @@ public class AmqpNioTransportHelper {
private void validateFrameSize(int frameSize) throws IOException {
if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
throw new IOException("Frame size of " + nextFrameSize +
"larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
throw new IOException("Frame size of " + nextFrameSize + "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
}
}
@ -152,7 +157,7 @@ public class AmqpNioTransportHelper {
currentBuffer.put(plain.get());
}
currentBuffer.flip();
if (!magicConsumed) { // The first case we see is special and has to be handled differently
if (!magicConsumed) { // The first case we see is special and has to be handled differently
transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer)));
magicConsumed = true;
} else {
@ -172,5 +177,4 @@ public class AmqpNioTransportHelper {
return nextFrameSize;
}
}

View File

@ -110,36 +110,38 @@ import org.slf4j.LoggerFactory;
class AmqpProtocolConverter implements IAmqpProtocolConverter {
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransport amqpTransport;
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader =
new ProtonFactoryLoader<MessageFactory>(MessageFactory.class);
private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class);
int prefetch = 100;
EngineFactory engineFactory = new EngineFactoryImpl();
Transport protonTransport = engineFactory.createTransport();
Connection protonConnection = engineFactory.createConnection();
MessageFactory messageFactory = messageFactoryLoader.loadFactory();
Collector eventCollector = new CollectorImpl();
protected int prefetch = 100;
protected EngineFactory engineFactory = new EngineFactoryImpl();
protected Transport protonTransport = engineFactory.createTransport();
protected Connection protonConnection = engineFactory.createConnection();
protected MessageFactory messageFactory = messageFactoryLoader.loadFactory();
protected Collector eventCollector = new CollectorImpl();
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
int maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
// AMQ-4914 - Setting the max frame size to large stalls out the QPid client on sends or
// consume due to no session credit. Once fixed we should set this value using
// the configured maxFrameSize on the URI.
//int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > Integer.MAX_VALUE ?
// Integer.MAX_VALUE : (int) transport.getWireFormat().getMaxFrameSize();
// AMQ-4914 - Setting the max frame size to large stalls out the QPid
// client on sends or
// consume due to no session credit. Once fixed we should set this value
// using
// the configured maxFrameSize on the URI.
// int maxFrameSize = transport.getWireFormat().getMaxFrameSize() >
// Integer.MAX_VALUE ?
// Integer.MAX_VALUE : (int)
// transport.getWireFormat().getMaxFrameSize();
this.protonTransport.setMaxFrameSize(maxFrameSize);
this.protonTransport.bind(this.protonConnection);
@ -245,7 +247,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (parts.length > 1) {
connectionInfo.setPassword(parts[1].utf8().toString());
}
// We can't really auth at this point since we don't know the client id yet.. :(
// We can't really auth at this point since we don't
// know the client id yet.. :(
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
amqpTransport.getWireFormat().magicRead = false;
sasl = null;
@ -371,7 +374,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (rh != null) {
rh.onResponse(this, response);
} else {
// Pass down any unexpected errors. Should this close the connection?
// Pass down any unexpected errors. Should this close the
// connection?
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
handleException(exception);
@ -393,7 +397,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
// Pass down any unexpected async errors. Should this close the
// connection?
Throwable exception = ((ConnectionError) command).getException();
handleException(exception);
} else if (command.isBrokerInfo()) {
@ -413,9 +418,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {}
public void onClose() throws Exception {
}
public void drainCheck() {}
public void drainCheck() {
}
abstract void doCommit() throws Exception;
@ -544,10 +551,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
@Override
void doCommit() throws Exception {}
void doCommit() throws Exception {
}
@Override
void doRollback() throws Exception {}
void doRollback() throws Exception {
}
abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
}
@ -575,7 +584,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
message.setProducerId(producerId);
// Always override the AMQP client's MessageId with our own. Preserve the
// Always override the AMQP client's MessageId with our own.
// Preserve the
// original in the TextView property for later Ack.
MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
@ -599,8 +609,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
// Lets handle the case where the expiration was set, but the timestamp
// was not set by the client. Lets assign the timestamp now, and adjust the
// Lets handle the case where the expiration was set, but the
// timestamp
// was not set by the client. Lets assign the timestamp now, and
// adjust the
// expiration.
if (message.getExpiration() != 0) {
if (message.getTimestamp() == 0) {
@ -624,8 +636,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
delivery.disposition(rejected);
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}",
prefetch - receiver.getCredit(), producerId);
LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
}
@ -638,8 +649,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
});
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}",
prefetch - receiver.getCredit(), producerId);
LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
pumpProtonToSocket();
}
@ -942,8 +952,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
ActiveMQMessage temp = null;
if (md.getMessage() != null) {
// Topics can dispatch the same Message to more than one consumer
// so we must copy to prevent concurrent read / write to the same
// Topics can dispatch the same Message to more than one
// consumer
// so we must copy to prevent concurrent read / write to
// the same
// message object.
if (md.getDestination().isTopic()) {
synchronized (md.getMessage()) {
@ -993,7 +1005,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
if (ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
// we are going to settle, but redeliver.. we we won't yet ack
// to ActiveMQ
delivery.settle();
onMessageDispatch((MessageDispatch) delivery.getContext());
} else {
@ -1013,7 +1026,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
ack.setTransactionId(localTxId);
// Store the message sent in this TX we might need to re-send on rollback
// Store the message sent in this TX we might need to
// re-send on rollback
md.getMessage().setTransactionId(localTxId);
dispatchedInTx.addFirst(md);
}
@ -1042,7 +1056,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public void drainCheck() {
// If we are a browser.. lets not say we are drained until
// we hit the end of browse message.
if( info.isBrowser() && !endOfBrowse)
if (info.isBrowser() && !endOfBrowse)
return;
if (outbound.isEmpty()) {
@ -1058,11 +1072,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (state instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) state;
if (txState.getOutcome() instanceof DeliveryState) {
LOG.trace("onDelivery: TX delivery state = {}", state);
state = (DeliveryState) txState.getOutcome();
if (state instanceof Accepted) {
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
@ -1073,7 +1084,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
if (state instanceof Accepted) {
LOG.trace("onDelivery: accepted state = {}", state);
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
public class AmqpProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;

View File

@ -16,28 +16,26 @@
*/
package org.apache.activemq.transport.amqp;
import org.fusesource.hawtbuf.Buffer;
import java.nio.ByteBuffer;
import org.fusesource.hawtbuf.Buffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AmqpSupport {
static public Buffer toBuffer(ByteBuffer data) {
if( data == null ) {
if (data == null) {
return null;
}
Buffer rc;
if( data.isDirect() ) {
if (data.isDirect()) {
rc = new Buffer(data.remaining());
data.get(rc.data);
} else {
rc = new Buffer(data);
data.position(data.position()+data.remaining());
data.position(data.position() + data.remaining());
}
return rc;
}
}

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import java.io.IOException;
import java.security.cert.X509Certificate;
import org.apache.activemq.command.Command;
/**
* Basic interface that mediates between protocol converter and transport
*/
@ -36,8 +36,6 @@ public interface AmqpTransport {
public void onException(IOException error);
// public AmqpInactivityMonitor getInactivityMonitor();
public AmqpWireFormat getWireFormat();
public void stop() throws Exception;
@ -49,6 +47,7 @@ public interface AmqpTransport {
public boolean isTrace();
public IAmqpProtocolConverter getProtocolConverter();
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.transport.amqp;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -25,9 +28,6 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import java.util.HashMap;
import java.util.Map;
/**
* A <a href="http://amqp.org/">AMQP</a> transport factory
*/
@ -35,10 +35,12 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
private BrokerContext brokerContext = null;
@Override
protected String getDefaultWireFormatType() {
return "amqp";
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
@ -46,6 +48,7 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
@ -56,26 +59,13 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
transport = super.serverConfigure(transport, format, options);
// strip off the mutex transport.
if( transport instanceof MutexTransport ) {
transport = ((MutexTransport)transport).getNext();
if (transport instanceof MutexTransport) {
transport = ((MutexTransport) transport).getNext();
}
// MutexTransport mutex = transport.narrow(MutexTransport.class);
// if (mutex != null) {
// mutex.setSyncOnCommand(true);
// }
return transport;
}
// @Override
// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
//
// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
// filter.setInactivityMonitor(monitor);
//
// return monitor;
// }
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;

View File

@ -16,22 +16,22 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.qpid.proton.jms.InboundTransformer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.qpid.proton.jms.InboundTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.locks.ReentrantLock;
/**
* The AMQPTransportFilter normally sits on top of a TcpTransport that has been
* configured with the AmqpWireFormat and is used to convert AMQP commands to
@ -43,12 +43,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
private IAmqpProtocolConverter protocolConverter;
// private AmqpInactivityMonitor monitor;
private AmqpWireFormat wireFormat;
private boolean trace;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantLock();
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
@ -58,6 +57,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@ -82,10 +82,12 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void sendToActiveMQ(IOException error) {
super.onException(error);
}
@Override
public void onCommand(Object command) {
try {
if (trace) {
@ -104,6 +106,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void sendToActiveMQ(Command command) {
assert lock.isHeldByCurrentThread();
TransportListener l = transportListener;
@ -112,6 +115,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void sendToAmqp(Object command) throws IOException {
assert lock.isHeldByCurrentThread();
if (trace) {
@ -123,6 +127,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
@ -134,6 +139,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
return null;
}
@Override
public boolean isTrace() {
return trace;
}
@ -143,15 +149,6 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
this.protocolConverter.updateTracer();
}
// @Override
// public AmqpInactivityMonitor getInactivityMonitor() {
// return monitor;
// }
//
// public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
// this.monitor = monitor;
// }
@Override
public AmqpWireFormat getWireFormat() {
return this.wireFormat;
@ -161,6 +158,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
super.onException(e);
}
@Override
public String getTransformer() {
return transformer;
}
@ -168,10 +166,13 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void setTransformer(String transformer) {
this.transformer = transformer;
}
@Override
public IAmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
}
@Override
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}

View File

@ -23,6 +23,8 @@ import org.apache.activemq.wireformat.WireFormatFactory;
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class AmqpWireFormatFactory implements WireFormatFactory {
@Override
public WireFormat createWireFormat() {
return new AmqpWireFormat();
}

View File

@ -16,10 +16,9 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Response;
import java.io.IOException;
import org.apache.activemq.command.Response;
/**
* Interface used by the AMQPProtocolConverter for callbacks.