Refactoring of the AMQP protocol stack to allow for more flexibility in
adding support for some additional AMQP semantics and group together
common functionality handling to avoid having to fix simillar issues in
multiple places.
This commit is contained in:
Timothy Bish 2015-03-24 18:09:28 -04:00
parent e33b3f5939
commit 3306467a64
24 changed files with 2601 additions and 1729 deletions

View File

@ -19,6 +19,8 @@ package org.apache.activemq.transport.amqp;
import org.fusesource.hawtbuf.Buffer;
/**
* Represents the AMQP protocol handshake packet that is sent during the
* initial exchange with a remote peer.
*/
public class AmqpHeader {

View File

@ -44,7 +44,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
private static Timer ACTIVITY_CHECK_TIMER;
private final AtomicBoolean failed = new AtomicBoolean(false);
private IAmqpProtocolConverter protocolConverter;
private AmqpProtocolConverter protocolConverter;
private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
private SchedulerTimerTask connectCheckerTask;
@ -98,15 +98,15 @@ public class AmqpInactivityMonitor extends TransportFilter {
}
}
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}
public IAmqpProtocolConverter getProtocolConverter() {
public AmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
}
synchronized void startConnectChecker(long connectionTimeout) {
public synchronized void startConnectChecker(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
if (connectionTimeout > 0 && connectCheckerTask == null) {
connectCheckerTask = new SchedulerTimerTask(connectChecker);
@ -124,7 +124,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
}
}
synchronized void stopConnectChecker() {
public synchronized void stopConnectChecker() {
if (connectCheckerTask != null) {
connectCheckerTask.cancel();
connectCheckerTask = null;

View File

@ -21,24 +21,23 @@ import java.util.ArrayList;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
/**
* Used to assign the best implementation of a AmqpProtocolConverter to the
* AmqpTransport based on the AmqpHeader that the client sends us.
*/
public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
public class AmqpProtocolDiscriminator implements AmqpProtocolConverter {
public static final int DEFAULT_PREFETCH = 1000;
private final AmqpTransport transport;
private final BrokerService brokerService;
private int producerCredit = DEFAULT_PREFETCH;
interface Discriminator {
boolean matches(AmqpHeader header);
IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
}
static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
@ -46,8 +45,8 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
DISCRIMINATORS.add(new Discriminator() {
@Override
public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
return new AmqpProtocolConverter(transport, brokerService);
public AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
return new AmqpConnection(transport, brokerService);
}
@Override
@ -66,7 +65,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
public AmqpProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
this.transport = transport;
this.brokerService = brokerService;
}
@ -88,8 +87,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
match = DISCRIMINATORS.get(0);
}
IAmqpProtocolConverter next = match.create(transport, brokerService);
next.setProducerCredit(producerCredit);
AmqpProtocolConverter next = match.create(transport, brokerService);
transport.setProtocolConverter(next);
for (Command send : pendingCommands) {
next.onActiveMQCommand(send);
@ -113,9 +111,4 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
@Override
public void updateTracer() {
}
@Override
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
}

View File

@ -30,7 +30,7 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
*/
public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
public class AmqpSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
private BrokerService brokerService = null;

View File

@ -20,10 +20,12 @@ import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Map;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.fusesource.hawtbuf.Buffer;
/**
@ -53,7 +55,7 @@ public class AmqpSupport {
public static final Symbol COPY = Symbol.getSymbol("copy");
// Lifetime policy symbols
public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
/**
* Search for a given Symbol in a given array of Symbol object.
@ -169,4 +171,39 @@ public class AmqpSupport {
Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
return buffer.bigEndianEditor().readLong();
}
/**
* Given an AMQP endpoint, deduce the appropriate ActiveMQDestination type and create
* a new instance. By default if the endpoint address does not carry the standard prefix
* value then we default to a Queue type destination. If the endpoint is null or is an
* AMQP Coordinator type endpoint this method returns null to indicate no destination
* can be mapped.
*
* @param endpoint
* the AMQP endpoint to construct an ActiveMQDestination from.
*
* @return a new ActiveMQDestination that best matches the address of the given endpoint
*
* @throws AmqpProtocolException if an error occurs while deducing the destination type.
*/
public static ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
if (endpoint == null) {
return null;
} else if (endpoint instanceof Coordinator) {
return null;
} else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
} else {
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
}
}
return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else {
throw new RuntimeException("Unexpected terminus type: " + endpoint);
}
}
}

View File

@ -46,11 +46,12 @@ public interface AmqpTransport {
public boolean isTrace();
public IAmqpProtocolConverter getProtocolConverter();
public AmqpProtocolConverter getProtocolConverter();
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
public void setProtocolConverter(AmqpProtocolConverter protocolConverter);
public void setInactivityMonitor(AmqpInactivityMonitor monitor);
public AmqpInactivityMonitor getInactivityMonitor();
}

View File

@ -25,7 +25,6 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -41,18 +40,17 @@ import org.slf4j.LoggerFactory;
public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
private IAmqpProtocolConverter protocolConverter;
public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
private AmqpProtocolConverter protocolConverter;
private AmqpWireFormat wireFormat;
private AmqpInactivityMonitor monitor;
private boolean trace;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private final ReentrantLock lock = new ReentrantLock();
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
super(next);
this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService);
if (wireFormat instanceof AmqpWireFormat) {
this.wireFormat = (AmqpWireFormat) wireFormat;
}
@ -170,20 +168,20 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
@Override
public String getTransformer() {
return transformer;
return wireFormat.getTransformer();
}
public void setTransformer(String transformer) {
this.transformer = transformer;
wireFormat.setTransformer(transformer);
}
@Override
public IAmqpProtocolConverter getProtocolConverter() {
public AmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
}
@Override
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}
@ -195,7 +193,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
public void setProducerCredit(int producerCredit) {
protocolConverter.setProducerCredit(producerCredit);
wireFormat.setProducerCredit(producerCredit);
}
public int getProducerCredit() {
return wireFormat.getProducerCredit();
}
@Override

View File

@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@ -37,6 +38,7 @@ public class AmqpWireFormat implements WireFormat {
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
private static final int SASL_PROTOCOL = 3;
@ -44,6 +46,8 @@ public class AmqpWireFormat implements WireFormat {
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int producerCredit = DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private boolean magicRead = false;
private ResetListener resetListener;
@ -207,4 +211,20 @@ public class AmqpWireFormat implements WireFormat {
public void setConnectAttemptTimeout(long connectAttemptTimeout) {
this.connectAttemptTimeout = connectAttemptTimeout;
}
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
public int getProducerCredit() {
return producerCredit;
}
public String getTransformer() {
return transformer;
}
public void setTransformer(String transformer) {
this.transformer = transformer;
}
}

View File

@ -20,7 +20,7 @@ import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
* Creates the default AMQP WireFormat object used to configure the protocol support.
*/
public class AmqpWireFormatFactory implements WireFormatFactory {

View File

@ -21,8 +21,21 @@ import java.io.IOException;
import org.apache.activemq.command.Response;
/**
* Interface used by the AMQPProtocolConverter for callbacks.
* Interface used by the AmqpProtocolConverter for callbacks from the broker.
*/
interface ResponseHandler {
void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException;
public interface ResponseHandler {
/**
* Called when the Broker has handled a previously issued request and
* has a response ready.
*
* @param converter
* the protocol converter that is awaiting the response.
* @param response
* the response from the broker.
*
* @throws IOException if an error occurs while processing the response.
*/
void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
}

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
/**
* Abstract AmqpLink implementation that provide basic Link services.
*/
public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLink {
protected final AmqpSession session;
protected final LINK_TYPE endpoint;
protected boolean closed;
protected boolean opened;
protected List<Runnable> closeActions = new ArrayList<Runnable>();
/**
* Creates a new AmqpLink type.
*
* @param session
* the AmqpSession that servers as the parent of this Link.
* @param endpoint
* the link endpoint this object represents.
*/
public AmqpAbstractLink(AmqpSession session, LINK_TYPE endpoint) {
this.session = session;
this.endpoint = endpoint;
}
@Override
public void open() {
if (!opened) {
getEndpoint().setContext(this);
getEndpoint().open();
opened = true;
}
}
@Override
public void detach() {
if (!closed) {
if (getEndpoint() != null) {
getEndpoint().setContext(null);
getEndpoint().detach();
getEndpoint().free();
}
}
}
@Override
public void close(ErrorCondition error) {
if (!closed) {
if (getEndpoint() != null) {
if (getEndpoint() instanceof Sender) {
getEndpoint().setSource(null);
} else {
getEndpoint().setTarget(null);
}
getEndpoint().setCondition(error);
}
close();
}
}
@Override
public void close() {
if (!closed) {
if (getEndpoint() != null) {
getEndpoint().setContext(null);
getEndpoint().close();
getEndpoint().free();
}
for (Runnable action : closeActions) {
action.run();
}
closeActions.clear();
opened = false;
closed = true;
}
}
/**
* @return true if this link has already been opened.
*/
public boolean isOpened() {
return opened;
}
/**
* @return true if this link has already been closed.
*/
public boolean isClosed() {
return closed;
}
/**
* @return the Proton Link type this link represents.
*/
public LINK_TYPE getEndpoint() {
return endpoint;
}
/**
* @return the parent AmqpSession for this Link instance.
*/
public AmqpSession getSession() {
return session;
}
@Override
public void addCloseAction(Runnable action) {
closeActions.add(action);
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command
* the Command object to send to the Broker.
*/
protected void sendToActiveMQ(Command command) {
session.getConnection().sendToActiveMQ(command, null);
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command
* the Command object to send to the Broker.
* @param handler
* the ResponseHandler that will handle the Broker's response.
*/
protected void sendToActiveMQ(Command command, ResponseHandler handler) {
session.getConnection().sendToActiveMQ(command, handler);
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract base that provides common services for AMQP Receiver types.
*/
public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractReceiver.class);
protected ByteArrayOutputStream current = new ByteArrayOutputStream();
protected final byte[] recvBuffer = new byte[1024 * 8];
/**
* Handle create of new AMQP Receiver instance.
*
* @param session
* the AmqpSession that servers as the parent of this Link.
* @param endpoint
* the Receiver endpoint being managed by this class.
*/
public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
super(session, endpoint);
}
@Override
public void detach() {
}
@Override
public void flow() throws Exception {
}
/**
* Provide the receiver endpoint with the given amount of credits.
*
* @param credits
* the credit value to pass on to the wrapped Receiver.
*/
public void flow(int credits) {
getEndpoint().flow(credits);
}
@Override
public void commit() throws Exception {
}
@Override
public void rollback() throws Exception {
}
@Override
public void delivery(Delivery delivery) throws Exception {
if (!delivery.isReadable()) {
LOG.debug("Delivery was not readable!");
return;
}
if (current == null) {
current = new ByteArrayOutputStream();
}
int count;
while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
current.write(recvBuffer, 0, count);
}
// Expecting more deliveries..
if (count == 0) {
return;
}
try {
processDelivery(delivery, current.toBuffer());
} finally {
getEndpoint().advance();
current = null;
}
}
protected abstract void processDelivery(Delivery delivery, Buffer deliveryBytes) throws Exception;
}

View File

@ -0,0 +1,742 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidClientIDException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.security.AuthenticationBroker;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.transport.amqp.AmqpTransport;
import org.apache.activemq.transport.amqp.AmqpTransportFilter;
import org.apache.activemq.transport.amqp.AmqpWireFormat;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the mechanics of managing a single remote peer connection.
*/
public class AmqpConnection implements AmqpProtocolConverter {
private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
private static final int CHANNEL_MAX = 32767;
private final Transport protonTransport = Proton.transport();
private final Connection protonConnection = Proton.connection();
private final Collector eventCollector = new CollectorImpl();
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
private final BrokerService brokerService;
private AuthenticationBroker authenticator;
private Sasl sasl;
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private final AtomicInteger lastCommandId = new AtomicInteger();
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
private boolean closing = false;
private boolean closedSocket = false;
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
this.amqpTransport = transport;
AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
if (monitor != null) {
monitor.setProtocolConverter(this);
}
this.amqpWireFormat = transport.getWireFormat();
this.brokerService = brokerService;
// the configured maxFrameSize on the URI.
int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
this.protonTransport.setMaxFrameSize(maxFrameSize);
}
this.protonTransport.bind(this.protonConnection);
this.protonTransport.setChannelMax(CHANNEL_MAX);
this.protonConnection.collect(eventCollector);
updateTracer();
}
/**
* Load and return a <code>[]Symbol</code> that contains the connection capabilities
* offered to new connections
*
* @return the capabilities that are offered to new clients on connect.
*/
protected Symbol[] getConnectionCapabilitiesOffered() {
return new Symbol[]{ ANONYMOUS_RELAY };
}
/**
* Load and return a <code>Map<Symbol, Object></code> that contains the properties
* that this connection supplies to incoming connections.
*
* @return the properties that are offered to the incoming connection.
*/
protected Map<Symbol, Object> getConnetionProperties() {
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(QUEUE_PREFIX, "queue://");
properties.put(TOPIC_PREFIX, "topic://");
return properties;
}
/**
* Load and return a <code>Map<Symbol, Object></code> that contains the properties
* that this connection supplies to incoming connections when the open has failed
* and the remote should expect a close to follow.
*
* @return the properties that are offered to the incoming connection.
*/
protected Map<Symbol, Object> getFailedConnetionProperties() {
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(CONNECTION_OPEN_FAILED, true);
return properties;
}
@Override
public void updateTracer() {
if (amqpTransport.isTrace()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
}
@Override
public void sentFrame(TransportFrame transportFrame) {
TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
}
});
}
}
//----- Connection Properties Accessors ----------------------------------//
/**
* @return the amount of credit assigned to AMQP receiver links created from
* sender links on the remote peer.
*/
public int getConfiguredReceiverCredit() {
return amqpWireFormat.getProducerCredit();
}
/**
* @return the transformer type that was configured for this AMQP transport.
*/
public String getConfiguredTransformer() {
return amqpWireFormat.getTransformer();
}
/**
* @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
*/
public ConnectionId getConnectionId() {
return connectionId;
}
/**
* @return the Client ID used to create the connection with ActiveMQ
*/
public String getClientId() {
return connectionInfo.getClientId();
}
//----- Proton Event handling and IO support -----------------------------//
void pumpProtonToSocket() {
try {
boolean done = false;
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
LOG.trace("Sending {} bytes out", toWrite.limit());
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
done = true;
}
}
} catch (IOException e) {
amqpTransport.onException(e);
}
}
@Override
public void onAMQPData(Object command) throws Exception {
Buffer frame;
if (command.getClass() == AmqpHeader.class) {
AmqpHeader header = (AmqpHeader) command;
if (amqpWireFormat.isHeaderValid(header)) {
LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
} else {
LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
amqpTransport.sendToAmqp(reply.getBuffer());
handleException(new AmqpProtocolException(
"Connection from client using unsupported AMQP attempted", true));
}
switch (header.getProtocolId()) {
case 0:
break; // nothing to do..
case 3: // Client will be using SASL for auth..
sasl = protonTransport.sasl();
sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
sasl.server();
break;
default:
}
frame = header.getBuffer();
} else {
frame = (Buffer) command;
}
onFrame(frame);
}
public void onFrame(Buffer frame) throws Exception {
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
frame.moveHead(count);
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
return;
}
try {
if (sasl != null) {
// Lets try to complete the sasl handshake.
if (sasl.getRemoteMechanisms().length > 0) {
if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
byte[] data = new byte[sasl.pending()];
sasl.recv(data, 0, data.length);
Buffer[] parts = new Buffer(data).split((byte) 0);
if (parts.length > 0) {
connectionInfo.setUserName(parts[0].utf8().toString());
}
if (parts.length > 1) {
connectionInfo.setPassword(parts[1].utf8().toString());
}
if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
} else {
sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
}
amqpTransport.getWireFormat().resetMagicRead();
sasl = null;
LOG.debug("SASL [PLAIN] Handshake complete.");
} else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
} else {
sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
}
amqpTransport.getWireFormat().resetMagicRead();
sasl = null;
LOG.debug("SASL [ANONYMOUS] Handshake complete.");
}
}
}
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
processConnectionOpen(event.getConnection());
break;
case CONNECTION_REMOTE_CLOSE:
processConnectionClose(event.getConnection());
break;
case SESSION_REMOTE_OPEN:
processSessionOpen(event.getSession());
break;
case SESSION_REMOTE_CLOSE:
processSessionClose(event.getSession());
break;
case LINK_REMOTE_OPEN:
processLinkOpen(event.getLink());
break;
case LINK_REMOTE_DETACH:
processLinkDetach(event.getLink());
break;
case LINK_REMOTE_CLOSE:
processLinkClose(event.getLink());
break;
case LINK_FLOW:
processLinkFlow(event.getLink());
break;
case DELIVERY:
processDelivery(event.getDelivery());
break;
default:
break;
}
eventCollector.pop();
}
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
pumpProtonToSocket();
}
}
protected void processConnectionOpen(Connection connection) throws Exception {
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
configureInactivityMonitor();
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
}
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
Throwable exception = null;
try {
if (response.isException()) {
protonConnection.setProperties(getFailedConnetionProperties());
protonConnection.open();
exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
} else if (exception instanceof InvalidClientIDException) {
protonConnection.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()));
} else {
protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
}
protonConnection.close();
} else {
protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
protonConnection.setProperties(getConnetionProperties());
protonConnection.open();
}
} finally {
pumpProtonToSocket();
if (response.isException()) {
amqpTransport.onException(IOExceptionSupport.create(exception));
}
}
}
});
}
protected void processConnectionClose(Connection connection) throws Exception {
if (!closing) {
closing = true;
sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.close();
protonConnection.free();
if (!closedSocket) {
pumpProtonToSocket();
}
}
});
sendToActiveMQ(new ShutdownInfo(), null);
}
}
protected void processSessionOpen(Session protonSession) throws Exception {
new AmqpSession(this, getNextSessionId(), protonSession).open();
}
protected void processSessionClose(Session protonSession) throws Exception {
if (protonSession.getContext() != null) {
((AmqpResource) protonSession.getContext()).close();
} else {
protonSession.close();
protonSession.free();
}
}
protected void processLinkOpen(Link link) throws Exception {
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
AmqpSession session = (AmqpSession) link.getSession().getContext();
if (link instanceof Receiver) {
if (link.getRemoteTarget() instanceof Coordinator) {
session.createCoordinator((Receiver) link);
} else {
session.createReceiver((Receiver) link);
}
} else {
session.createSender((Sender) link);
}
}
protected void processLinkDetach(Link link) throws Exception {
Object context = link.getContext();
if (context instanceof AmqpLink) {
((AmqpLink) context).detach();
} else {
link.detach();
link.free();
}
}
protected void processLinkClose(Link link) throws Exception {
Object context = link.getContext();
if (context instanceof AmqpLink) {
((AmqpLink) context).close();;
} else {
link.close();
link.free();
}
}
protected void processLinkFlow(Link link) throws Exception {
Object context = link.getContext();
if (context instanceof AmqpLink) {
((AmqpLink) context).flow();
}
}
protected void processDelivery(Delivery delivery) throws Exception {
if (!delivery.isPartial()) {
Object context = delivery.getLink().getContext();
if (context instanceof AmqpLink) {
AmqpLink amqpLink = (AmqpLink) context;
amqpLink.delivery(delivery);
}
}
}
//----- Event entry points for ActiveMQ commands and errors --------------//
@Override
public void onAMQPException(IOException error) {
closedSocket = true;
if (!closing) {
amqpTransport.sendToActiveMQ(error);
} else {
try {
amqpTransport.stop();
} catch (Exception ignore) {
}
}
}
@Override
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
rh.onResponse(this, response);
} else {
// Pass down any unexpected errors. Should this close the connection?
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
handleException(exception);
}
}
} else if (command.isMessageDispatch()) {
MessageDispatch dispatch = (MessageDispatch) command;
AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
if (sender != null) {
// End of Queue Browse will have no Message object.
if (dispatch.getMessage() != null) {
LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
} else {
LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
}
sender.onMessageDispatch(dispatch);
if (dispatch.getMessage() != null) {
LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
}
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError) command).getException();
handleException(exception);
} else if (command.isBrokerInfo()) {
// ignore
} else {
LOG.debug("Do not know how to process ActiveMQ Command {}", command);
}
}
//----- Utility methods for connection resources to use ------------------//
void regosterSender(ConsumerId consumerId, AmqpSender sender) {
subscriptionsByConsumerId.put(consumerId, sender);
}
void unregosterSender(ConsumerId consumerId) {
subscriptionsByConsumerId.remove(consumerId);
}
ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
ActiveMQDestination result = null;
RegionBroker regionBroker;
try {
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
} catch (Exception e) {
throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
}
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
if (subscription != null) {
result = subscription.getActiveMQDestination();
}
return result;
}
ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
ActiveMQDestination rc = null;
if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
} else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
} else {
LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
}
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(rc);
sendToActiveMQ(info, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
link.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
} else {
link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
}
link.close();
link.free();
}
}
});
return rc;
}
void deleteTemporaryDestination(ActiveMQTempDestination destination) {
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
sendToActiveMQ(info, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
}
}
});
}
void sendToActiveMQ(Command command) {
sendToActiveMQ(command, null);
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(lastCommandId.incrementAndGet());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
amqpTransport.sendToActiveMQ(command);
}
void handleException(Throwable exception) {
exception.printStackTrace();
LOG.debug("Exception detail", exception);
try {
amqpTransport.stop();
} catch (Throwable e) {
LOG.error("Failed to stop AMQP Transport ", e);
}
}
//----- Internal implementation ------------------------------------------//
private SessionId getNextSessionId() {
return new SessionId(connectionId, nextSessionId++);
}
private void configureInactivityMonitor() {
AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
if (monitor == null) {
return;
}
monitor.stopConnectChecker();
}
private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
try {
if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null) {
return true;
}
return false;
} catch (Throwable error) {
return false;
}
}
private AuthenticationBroker getAuthenticator() {
if (authenticator == null) {
try {
authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
} catch (Exception e) {
LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
}
if (authenticator == null) {
authenticator = new DefaultAuthenticationBroker();
}
}
return authenticator;
}
private class DefaultAuthenticationBroker implements AuthenticationBroker {
@Override
public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
return new SecurityContext(username) {
@Override
public Set<Principal> getPrincipals() {
return null;
}
};
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
/**
* Interface used to define the operations needed to implement an AMQP
* Link based endpoint, i.e. Sender, Receiver or Coordinator.
*/
public interface AmqpLink extends AmqpResource {
/**
* Close the Link with an error indicating the reson for the close.
*
* @param error
* the error that prompted the close.
*/
void close(ErrorCondition error);
/**
* Request from the remote peer to detach this resource.
*/
void detach();
/**
* Handles an incoming flow control.
*
* @throws Excption if an error occurs during the flow processing.
*/
void flow() throws Exception;
/**
* Called when a new Delivery arrives for the given Link.
*
* @param delivery
* the newly arrived delivery on this link.
*
* @throws Exception if an error occurs while processing the new Delivery.
*/
void delivery(Delivery delivery) throws Exception;
/**
* Handle work necessary on commit of transacted resources associated with
* this Link instance.
*
* @throws Exception if an error occurs while performing the commit.
*/
void commit() throws Exception;
/**
* Handle work necessary on rollback of transacted resources associated with
* this Link instance.
*
* @throws Exception if an error occurs while performing the rollback.
*/
void rollback() throws Exception;
/**
* @return the ActiveMQDestination that this link is servicing.
*/
public ActiveMQDestination getDestination();
/**
* Sets the ActiveMQDestination that this link will be servicing.
*
* @param destination
* the ActiveMQDestination that this link services.
*/
public void setDestination(ActiveMQDestination destination);
/**
* Adds a new Runnable that is called on close of this link.
*
* @param action
* a Runnable that will be executed when the link closes or detaches.
*/
public void addCloseAction(Runnable action);
}

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import java.io.IOException;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer
* which holds the corresponding Sender which transfers message accross the
* link. The AmqpReceiver handles all incoming deliveries by converting them
* or wrapping them into an ActiveMQ message object and forwarding that message
* on to the appropriate ActiveMQ Destination.
*/
public class AmqpReceiver extends AmqpAbstractReceiver {
private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
private final ProducerInfo producerInfo;
private final int configuredCredit;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private InboundTransformer inboundTransformer;
/**
* Create a new instance of an AmqpReceiver
*
* @param session
* the Session that is the parent of this AmqpReceiver instance.
* @param endpoint
* the AMQP receiver endpoint that the class manages.
* @param producerInfo
* the ProducerInfo instance that contains this sender's configuration.
*/
public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) {
super(session, endpoint);
this.producerInfo = producerInfo;
this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
}
@Override
public void close() {
if (!isClosed() && isOpened()) {
sendToActiveMQ(new RemoveInfo(getProducerId()));
}
super.close();
}
//----- Configuration accessors ------------------------------------------//
/**
* @return the ActiveMQ ProducerId used to register this Receiver on the Broker.
*/
public ProducerId getProducerId() {
return producerInfo.getProducerId();
}
@Override
public ActiveMQDestination getDestination() {
return producerInfo.getDestination();
}
@Override
public void setDestination(ActiveMQDestination destination) {
producerInfo.setDestination(destination);
}
/**
* If the Sender that initiated this Receiver endpoint did not define an address
* then it is using anonymous mode and message are to be routed to the address
* that is defined in the AMQP message 'To' field.
*
* @return true if this Receiver should operate in anonymous mode.
*/
public boolean isAnonymous() {
return producerInfo.getDestination() == null;
}
/**
* Returns the amount of receiver credit that has been configured for this AMQP
* transport. If no value was configured on the TransportConnector URI then a
* sensible default is used.
*
* @return the configured receiver credit to grant.
*/
public int getConfiguredReceiverCredit() {
return configuredCredit;
}
//----- Internal Implementation ------------------------------------------//
protected InboundTransformer getInboundTransformer() {
if (inboundTransformer == null) {
String transformer = session.getConnection().getConfiguredTransformer();
if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
} else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
} else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
} else {
LOG.warn("Unknown transformer type {} using native one instead", transformer);
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
}
}
return inboundTransformer;
}
@Override
protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
if (!isClosed()) {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
if (isAnonymous()) {
Destination toDestination = message.getJMSDestination();
if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription("Missing to field for message sent to an anonymous producer");
rejected.setError(condition);
delivery.disposition(rejected);
return;
}
} else {
message.setJMSDestination(getDestination());
}
message.setProducerId(getProducerId());
// Always override the AMQP client's MessageId with our own. Preserve
// the original in the TextView property for later Ack.
MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId());
MessageId amqpMessageId = message.getMessageId();
if (amqpMessageId != null) {
if (amqpMessageId.getTextView() != null) {
messageId.setTextView(amqpMessageId.getTextView());
} else {
messageId.setTextView(amqpMessageId.toString());
}
}
message.setMessageId(messageId);
LOG.trace("Inbound Message:{} from Producer:{}",
message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId());
final DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid));
}
message.onSend();
if (!delivery.remotelySettled()) {
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(er.getException().getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
} else {
if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
}
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
delivery.disposition(txAccepted);
} else {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
}
session.pumpProtonToSocket();
}
});
} else {
if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
session.pumpProtonToSocket();
}
sendToActiveMQ(message);
}
}
}
}

View File

@ -14,23 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import org.apache.activemq.command.Command;
package org.apache.activemq.transport.amqp.protocol;
/**
* Root interface for all endpoint objects.
*/
public interface IAmqpProtocolConverter {
public interface AmqpResource {
void onAMQPData(Object command) throws Exception;
/**
* Request from the remote peer to open this resource.
*/
void open();
void onAMQPException(IOException error);
/**
* Request from the remote peer to close this resource.
*/
void close();
void onActiveMQCommand(Command command) throws Exception;
void updateTracer();
void setProducerCredit(int producerCredit);
}

View File

@ -0,0 +1,451 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An AmqpSender wraps the AMQP Sender end of a link from the remote peer
* which holds the corresponding Receiver which receives messages transfered
* across the link from the Broker.
*
* An AmqpSender is in turn a message consumer subscribed to some destination
* on the broker. As messages are dispatched to this sender that are sent on
* to the remote Receiver end of the lin.
*/
public class AmqpSender extends AmqpAbstractLink<Sender> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
private final ConsumerInfo consumerInfo;
private final boolean presettle;
private boolean closed;
private boolean endOfBrowse;
private int currentCredit;
private long lastDeliveredSequenceId;
private Buffer currentBuffer;
private Delivery currentDelivery;
/**
* Creates a new AmqpSender instance that manages the given Sender
*
* @param session
* the AmqpSession object that is the parent of this instance.
* @param endpoint
* the AMQP Sender instance that this class manages.
* @param consumerInfo
* the ConsumerInfo instance that holds configuration for this sender.
*/
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
super(session, endpoint);
this.currentCredit = endpoint.getRemoteCredit();
this.consumerInfo = consumerInfo;
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
}
@Override
public void open() {
if (!closed) {
session.regosterSender(getConsumerId(), this);
}
super.open();
}
@Override
public void detach() {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
session.unregisterSender(getConsumerId());
}
super.detach();
}
@Override
public void close() {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
if (consumerInfo.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(session.getConnection().getConnectionId());
rsi.setSubscriptionName(getEndpoint().getName());
rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(rsi, null);
session.unregisterSender(getConsumerId());
}
}
super.close();
}
@Override
public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit();
if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(currentCredit);
sendToActiveMQ(control, null);
}
drainCheck();
}
@Override
public void delivery(Delivery delivery) throws Exception {
MessageDispatch md = (MessageDispatch) delivery.getContext();
DeliveryState state = delivery.getRemoteState();
if (state instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) state;
LOG.trace("onDelivery: TX delivery state = {}", state);
if (txState.getOutcome() != null) {
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) {
if (!delivery.remotelySettled()) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) state).getTxnId());
delivery.disposition(txAccepted);
}
settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
}
}
} else {
if (state instanceof Accepted) {
LOG.trace("onDelivery: accepted state = {}", state);
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
}
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if (state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
settle(delivery, -1);
} else if (state instanceof Released) {
LOG.trace("onDelivery: Released state = {}", state);
// re-deliver && don't increment the counter.
settle(delivery, -1);
} else if (state instanceof Modified) {
Modified modified = (Modified) state;
if (modified.getDeliveryFailed()) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
}
LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
byte ackType = -1;
Boolean undeliverableHere = modified.getUndeliverableHere();
if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message..
// perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE;
}
settle(delivery, ackType);
}
}
pumpOutbound();
}
@Override
public void commit() throws Exception {
if (!dispatchedInTx.isEmpty()) {
for (MessageDispatch md : dispatchedInTx) {
MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
sendToActiveMQ(pendingTxAck, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
getEndpoint().close();
}
}
session.pumpProtonToSocket();
}
});
}
dispatchedInTx.clear();
}
}
@Override
public void rollback() throws Exception {
synchronized (outbound) {
LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
for (MessageDispatch dispatch : dispatchedInTx) {
dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
dispatch.getMessage().setTransactionId(null);
outbound.addFirst(dispatch);
}
dispatchedInTx.clear();
}
}
/**
* Event point for incoming message from ActiveMQ on this Sender's
* corresponding subscription.
*
* @param dispatch
* the MessageDispatch to process and send across the link.
*
* @throws Exception if an error occurs while encoding the message for send.
*/
public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
if (!isClosed()) {
// Lock to prevent stepping on TX redelivery
synchronized (outbound) {
outbound.addLast(dispatch);
}
pumpOutbound();
session.pumpProtonToSocket();
}
}
@Override
public String toString() {
return "AmqpSender {" + getConsumerId() + "}";
}
//----- Property getters and setters -------------------------------------//
public ConsumerId getConsumerId() {
return consumerInfo.getConsumerId();
}
@Override
public ActiveMQDestination getDestination() {
return consumerInfo.getDestination();
}
@Override
public void setDestination(ActiveMQDestination destination) {
consumerInfo.setDestination(destination);
}
//----- Internal Implementation ------------------------------------------//
public void pumpOutbound() throws Exception {
while (!closed) {
while (currentBuffer != null) {
int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if (sent > 0) {
currentBuffer.moveHead(sent);
if (currentBuffer.length == 0) {
if (presettle) {
settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else {
getEndpoint().advance();
}
currentBuffer = null;
currentDelivery = null;
}
} else {
return;
}
}
if (outbound.isEmpty()) {
return;
}
final MessageDispatch md = outbound.removeFirst();
try {
ActiveMQMessage temp = null;
if (md.getMessage() != null) {
// Topics can dispatch the same Message to more than one consumer
// so we must copy to prevent concurrent read / write to the same
// message object.
if (md.getDestination().isTopic()) {
synchronized (md.getMessage()) {
temp = (ActiveMQMessage) md.getMessage().copy();
}
} else {
temp = (ActiveMQMessage) md.getMessage();
}
if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
temp.setProperty(MESSAGE_FORMAT_KEY, 0);
}
}
final ActiveMQMessage jms = temp;
if (jms == null) {
// It's the end of browse signal.
endOfBrowse = true;
drainCheck();
} else {
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms);
if (amqp != null && amqp.getLength() > 0) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
if (presettle) {
currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = tagCache.getNextTag();
currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
} else {
// TODO: message could not be generated what now?
}
}
} catch (Exception e) {
LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
}
}
}
private void settle(final Delivery delivery, final int ackType) throws Exception {
byte[] tag = delivery.getTag();
if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
tagCache.returnTag(tag);
}
if (ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
delivery.settle();
onMessageDispatch((MessageDispatch) delivery.getContext());
} else {
MessageDispatch md = (MessageDispatch) delivery.getContext();
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
MessageAck ack = new MessageAck();
ack.setConsumerId(getConsumerId());
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setAckType((byte) ackType);
ack.setDestination(md.getDestination());
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid);
ack.setTransactionId(localTxId);
// Store the message sent in this TX we might need to
// re-send on rollback
md.getMessage().setTransactionId(localTxId);
dispatchedInTx.addFirst(md);
}
LOG.trace("Sending Ack to ActiveMQ: {}", ack);
sendToActiveMQ(ack, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
getEndpoint().close();
}
} else {
delivery.settle();
}
session.pumpProtonToSocket();
}
});
}
}
private void drainCheck() {
// If we are a browser.. lets not say we are drained until
// we hit the end of browse message.
if (consumerInfo.isBrowser() && !endOfBrowse) {
return;
}
if (outbound.isEmpty()) {
getEndpoint().drained();
}
}
}

View File

@ -0,0 +1,365 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wraps the AMQP Session and provides the services needed to manage the remote
* peer requests for link establishment.
*/
public class AmqpSession implements AmqpResource {
private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
private final AmqpConnection connection;
private final Session protonSession;
private final SessionId sessionId;
private long nextProducerId = 0;
private long nextConsumerId = 0;
/**
* Create new AmqpSession instance whose parent is the given AmqpConnection.
*
* @param connection
* the parent connection for this session.
* @param sessionId
* the ActiveMQ SessionId that is used to identify this session.
* @param session
* the AMQP Session that this class manages.
*/
public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
this.connection = connection;
this.sessionId = sessionId;
this.protonSession = session;
}
@Override
public void open() {
LOG.trace("Session {} opened", getSessionId());
getEndpoint().setContext(this);
getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
getEndpoint().open();
connection.sendToActiveMQ(new SessionInfo(getSessionId()));
}
@Override
public void close() {
LOG.trace("Session {} closed", getSessionId());
getEndpoint().setContext(null);
getEndpoint().close();
getEndpoint().free();
connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
}
/**
* Commits all pending work for all resources managed under this session.
*
* @throws Exception if an error occurs while attempting to commit work.
*/
public void commit() throws Exception {
for (AmqpSender consumer : consumers.values()) {
consumer.commit();
}
}
/**
* Rolls back any pending work being down under this session.
*
* @throws Exception if an error occurs while attempting to roll back work.
*/
public void rollback() throws Exception {
for (AmqpSender consumer : consumers.values()) {
consumer.rollback();
}
}
/**
* Used to direct all Session managed Senders to push any queued Messages
* out to the remote peer.
*
* @throws Exception if an error occurs while flushing the messages.
*/
public void flushPendingMessages() throws Exception {
for (AmqpSender consumer : consumers.values()) {
consumer.pumpOutbound();
}
}
public void createCoordinator(final Receiver protonReceiver) throws Exception {
AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
txCoordinator.flow(connection.getConfiguredReceiverCredit());
txCoordinator.open();
}
public void createReceiver(final Receiver protonReceiver) throws Exception {
org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
try {
Target target = (Target) remoteTarget;
ActiveMQDestination destination = null;
String targetNodeName = target.getAddress();
if (target.getDynamic()) {
destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
Target actualTarget = new Target();
actualTarget.setAddress(destination.getQualifiedName());
actualTarget.setDynamic(true);
protonReceiver.setTarget(actualTarget);
receiver.addCloseAction(new Runnable() {
@Override
public void run() {
connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
}
});
} else if (targetNodeName != null && !targetNodeName.isEmpty()) {
destination = createDestination(remoteTarget);
}
receiver.setDestination(destination);
connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ErrorCondition error = null;
Throwable exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
} else {
error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
}
receiver.close(error);
} else {
receiver.flow(connection.getConfiguredReceiverCredit());
receiver.open();
}
pumpProtonToSocket();
}
});
} catch (AmqpProtocolException exception) {
receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
}
}
@SuppressWarnings("unchecked")
public void createSender(final Sender protonSender) throws Exception {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
try {
final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
protonSender.setContext(sender);
boolean noLocal = false;
String selector = null;
if (source != null) {
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
if (filter != null) {
noLocal = true;
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
ActiveMQDestination destination;
if (source == null) {
// Attempt to recover previous subscription
destination = connection.lookupSubscription(protonSender.getName());
if (destination != null) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
} else {
sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
return;
}
} else if (source.getDynamic()) {
// lets create a temp dest.
destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setDynamic(true);
sender.addCloseAction(new Runnable() {
@Override
public void run() {
connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
}
});
} else {
destination = createDestination(source);
}
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
protonSender.setSource(source);
int senderCredit = protonSender.getRemoteCredit();
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
consumerInfo.setDispatchAsync(true);
consumerInfo.setNoLocal(noLocal);
if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
}
if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
consumerInfo.setSubscriptionName(protonSender.getName());
}
connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ErrorCondition error = null;
Throwable exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
} else if (exception instanceof InvalidSelectorException) {
error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
} else {
error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
}
sender.close(error);
} else {
sender.open();
}
pumpProtonToSocket();
}
});
} catch (AmqpProtocolException e) {
sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
}
}
/**
* Send all pending work out to the remote peer.
*/
public void pumpProtonToSocket() {
connection.pumpProtonToSocket();
}
public void regosterSender(ConsumerId consumerId, AmqpSender sender) {
consumers.put(consumerId, sender);
connection.regosterSender(consumerId, sender);
}
public void unregisterSender(ConsumerId consumerId) {
consumers.remove(consumerId);
connection.unregosterSender(consumerId);
}
//----- Configuration accessors ------------------------------------------//
public AmqpConnection getConnection() {
return connection;
}
public SessionId getSessionId() {
return sessionId;
}
public Session getEndpoint() {
return protonSession;
}
//----- Internal Implementation ------------------------------------------//
protected ConsumerId getNextConsumerId() {
return new ConsumerId(sessionId, nextConsumerId++);
}
protected ProducerId getNextProducerId() {
return new ProducerId(sessionId, nextProducerId++);
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the AMQP Transaction Coordinator support to manage local
* transactions between an AMQP client and the broker.
*/
public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
private long nextTransactionId;
/**
* Creates a new Transaction coordinator used to manage AMQP transactions.
*
* @param session
* the AmqpSession under which the coordinator was created.
* @param receiver
* the AMQP receiver link endpoint for this coordinator.
*/
public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
super(session, endpoint);
}
@Override
protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
Message message = Proton.message();
int offset = deliveryBytes.offset;
int len = deliveryBytes.length;
while (len > 0) {
final int decoded = message.decode(deliveryBytes.data, offset, len);
assert decoded > 0 : "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
ConnectionId connectionId = session.getConnection().getConnectionId();
final Object action = ((AmqpValue) message.getBody()).getValue();
LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
if (action instanceof Declare) {
Declare declare = (Declare) action;
if (declare.getGlobalId() != null) {
throw new Exception("don't know how to handle a declare /w a set GlobalId");
}
long txid = getNextTransactionId();
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null);
LOG.trace("started transaction {}", txid);
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txid)));
delivery.disposition(declared);
delivery.settle();
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
final byte operation;
if (discharge.getFail()) {
LOG.trace("rollback transaction {}", txid);
operation = TransactionInfo.ROLLBACK;
} else {
LOG.trace("commit transaction {}", txid);
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
if (operation == TransactionInfo.ROLLBACK) {
session.rollback();
} else {
session.commit();
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
delivery.disposition(rejected);
} else {
delivery.disposition(Accepted.getInstance());
}
LOG.debug("TX: {} settling {}", operation, action);
delivery.settle();
session.pumpProtonToSocket();
}
});
if (operation == TransactionInfo.ROLLBACK) {
session.flushPendingMessages();
}
} else {
throw new Exception("Expected coordinator message type: " + action.getClass());
}
}
private long getNextTransactionId() {
return ++nextTransactionId;
}
@Override
public ActiveMQDestination getDestination() {
return null;
}
@Override
public void setDestination(ActiveMQDestination destination) {
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.protocol;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* Utility class that can generate and if enabled pool the binary tag values
* used to identify transfers over an AMQP link.
*/
public final class AmqpTransferTagGenerator {
public static final int DEFAULT_TAG_POOL_SIZE = 1024;
private long nextTagId;
private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
private final Set<byte[]> tagPool;
public AmqpTransferTagGenerator() {
this(false);
}
public AmqpTransferTagGenerator(boolean pool) {
if (pool) {
this.tagPool = new LinkedHashSet<byte[]>();
} else {
this.tagPool = null;
}
}
/**
* Retrieves the next available tag.
*
* @return a new or unused tag depending on the pool option.
*/
public byte[] getNextTag() {
byte[] rc;
if (tagPool != null && !tagPool.isEmpty()) {
final Iterator<byte[]> iterator = tagPool.iterator();
rc = iterator.next();
iterator.remove();
} else {
try {
rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
// This should never happen since we control the input.
throw new RuntimeException(e);
}
}
return rc;
}
/**
* When used as a pooled cache of tags the unused tags should always be returned once
* the transfer has been settled.
*
* @param data
* a previously borrowed tag that is no longer in use.
*/
public void returnTag(byte[] data) {
if (tagPool != null && tagPool.size() < maxPoolSize) {
tagPool.add(data);
}
}
/**
* Gets the current max pool size value.
*
* @return the current max tag pool size.
*/
public int getMaxPoolSize() {
return maxPoolSize;
}
/**
* Sets the max tag pool size. If the size is smaller than the current number
* of pooled tags the pool will drain over time until it matches the max.
*
* @param maxPoolSize
* the maximum number of tags to hold in the pool.
*/
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
}

View File

@ -5,13 +5,13 @@
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
##
## http://www.apache.org/licenses/LICENSE-2.0
##
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory
class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory

View File

@ -46,6 +46,7 @@ import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -111,7 +112,7 @@ public class AmqpTestSupport {
SSLContext.setDefault(ctx);
// Setup SSL context...
final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
File keystore = new File(classesDir, "../../src/test/resources/keystore");
final SpringSslContext sslContext = new SpringSslContext();
sslContext.setKeyStore(keystore.getCanonicalPath());

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.transport.amqp.interop;
import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
import static org.junit.Assert.assertEquals;
@ -194,7 +194,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
source.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created
@ -216,7 +216,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
target.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created