Adding hooks so that in the future we can more easily support handling older versions of the AMQP protocol.

This commit is contained in:
Hiram Chirino 2013-09-17 15:31:53 -04:00
parent 97bbd2dfe9
commit 8d5b9a5587
7 changed files with 200 additions and 29 deletions

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import java.io.IOException;
import java.util.ArrayList;
/**
* Used to assign the best implementation of a AmqpProtocolConverter to the
* AmqpTransport based on the AmqpHeader that the client sends us.
*/
public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
final private AmqpTransport transport;
interface Discriminator {
boolean matches(AmqpHeader header);
IAmqpProtocolConverter create(AmqpTransport transport);
}
static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
static {
DISCRIMINATORS.add(new Discriminator(){
@Override
public IAmqpProtocolConverter create(AmqpTransport transport) {
return new AmqpProtocolConverter(transport);
}
@Override
public boolean matches(AmqpHeader header) {
switch( header.getProtocolId() ) {
case 0:
case 3:
if( header.getMajor() == 1 && header.getMinor()==0 && header.getRevision()==0 )
return true;
}
return false;
}
});
}
static final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
public AMQPProtocolDiscriminator(AmqpTransport transport) {
this.transport = transport;
}
@Override
public void onAMQPData(Object command) throws Exception {
if (command.getClass() == AmqpHeader.class) {
AmqpHeader header = (AmqpHeader) command;
Discriminator match = null;
for (Discriminator discriminator : DISCRIMINATORS) {
if( discriminator.matches(header) ) {
match = discriminator;
}
}
// Lets use first in the list if none are a good match.
if( match == null ) {
match = DISCRIMINATORS.get(0);
}
IAmqpProtocolConverter next = match.create(transport);
transport.setProtocolConverter(next);
for (Command send : pendingCommands) {
next.onActiveMQCommand(send);
}
pendingCommands.clear();
next.onAMQPData(command);
} else {
throw new IllegalStateException();
}
}
@Override
public void onAMQPException(IOException error) {
}
@Override
public void onActiveMQCommand(Command command) throws Exception {
pendingCommands.add(command);
}
@Override
public void updateTracer() {
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@ -28,6 +29,7 @@ import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
@ -35,6 +37,7 @@ import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
@ -80,6 +83,8 @@ public class AmqpNioTransport extends TcpTransport {
this.buffOut = outPutStream;
}
boolean magicRead = false;
private void serviceRead() {
try {
@ -100,9 +105,25 @@ public class AmqpNioTransport extends TcpTransport {
receiveCounter += readSize;
inputBuffer.flip();
if( !magicRead ) {
if( inputBuffer.remaining()>= 8 ) {
magicRead = true;
Buffer magic = new Buffer(8);
for (int i = 0; i < 8; i++) {
magic.data[i] = inputBuffer.get();
}
doConsume(new AmqpHeader(magic));
} else {
inputBuffer.flip();
continue;
}
}
doConsume(AmqpSupport.toBuffer(inputBuffer));
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
onException(e);

View File

@ -25,11 +25,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
@ -86,7 +84,6 @@ import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
@ -104,7 +101,7 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AmqpProtocolConverter {
class AmqpProtocolConverter implements IAmqpProtocolConverter {
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
@ -122,18 +119,17 @@ class AmqpProtocolConverter {
int prefetch = 100;
ReentrantLock lock = new ReentrantLock();
EngineFactory engineFactory = new EngineFactoryImpl();
Transport protonTransport = engineFactory.createTransport();
Connection protonConnection = engineFactory.createConnection();
public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
this.protonTransport.bind(this.protonConnection);
updateTracer();
}
void updateTracer() {
public void updateTracer() {
if (amqpTransport.isTrace()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@Override
@ -188,6 +184,7 @@ class AmqpProtocolConverter {
/**
* Convert a AMQP command
*/
@Override
public void onAMQPData(Object command) throws Exception {
Buffer frame;
if (command.getClass() == AmqpHeader.class) {
@ -313,7 +310,7 @@ class AmqpProtocolConverter {
closing = true;
sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.close();
if (!closedSocket) {
pumpProtonToSocket();
@ -323,6 +320,7 @@ class AmqpProtocolConverter {
}
}
@Override
public void onAMQPException(IOException error) {
closedSocket = true;
if (!closing) {
@ -335,6 +333,7 @@ class AmqpProtocolConverter {
}
}
@Override
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
@ -405,7 +404,7 @@ class AmqpProtocolConverter {
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open();
pumpProtonToSocket();
@ -554,7 +553,7 @@ class AmqpProtocolConverter {
message.onSend();
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (!delivery.remotelySettled()) {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
@ -649,7 +648,7 @@ class AmqpProtocolConverter {
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
@ -706,7 +705,7 @@ class AmqpProtocolConverter {
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
@ -920,7 +919,7 @@ class AmqpProtocolConverter {
sendToActiveMQ(ack, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
@ -1020,7 +1019,7 @@ class AmqpProtocolConverter {
sendToActiveMQ(pendingTxAck, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
@ -1100,7 +1099,7 @@ class AmqpProtocolConverter {
consumerContext.closed = true;
sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
@ -1153,7 +1152,7 @@ class AmqpProtocolConverter {
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();

View File

@ -17,7 +17,6 @@
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import org.fusesource.hawtbuf.Buffer;
import java.io.IOException;
import java.security.cert.X509Certificate;
@ -49,4 +48,7 @@ public interface AmqpTransport {
public boolean isTrace();
public IAmqpProtocolConverter getProtocolConverter();
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
}

View File

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.locks.ReentrantLock;
/**
* The AMQPTransportFilter normally sits on top of a TcpTransport that has been
@ -41,17 +42,17 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
private final AmqpProtocolConverter protocolConverter;
private IAmqpProtocolConverter protocolConverter;
// private AmqpInactivityMonitor monitor;
private AmqpWireFormat wireFormat;
private boolean trace;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private ReentrantLock lock = new ReentrantLock();
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
this.protocolConverter = new AMQPProtocolDiscriminator(this);
if (wireFormat instanceof AmqpWireFormat) {
this.wireFormat = (AmqpWireFormat) wireFormat;
}
@ -60,11 +61,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
protocolConverter.lock.lock();
lock.lock();
try {
protocolConverter.onActiveMQCommand(command);
} finally {
protocolConverter.lock.unlock();
lock.unlock();
}
} catch (Exception e) {
throw IOExceptionSupport.create(e);
@ -73,11 +74,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
@Override
public void onException(IOException error) {
protocolConverter.lock.lock();
lock.lock();
try {
protocolConverter.onAMQPException(error);
} finally {
protocolConverter.lock.unlock();
lock.unlock();
}
}
@ -90,11 +91,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
if (trace) {
TRACE_BYTES.trace("Received: \n{}", command);
}
protocolConverter.lock.lock();
lock.lock();
try {
protocolConverter.onAMQPData(command);
} finally {
protocolConverter.lock.unlock();
lock.unlock();
}
} catch (IOException e) {
handleException(e);
@ -104,7 +105,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
public void sendToActiveMQ(Command command) {
assert protocolConverter.lock.isHeldByCurrentThread();
assert lock.isHeldByCurrentThread();
TransportListener l = transportListener;
if (l != null) {
l.onCommand(command);
@ -112,7 +113,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
public void sendToAmqp(Object command) throws IOException {
assert protocolConverter.lock.isHeldByCurrentThread();
assert lock.isHeldByCurrentThread();
if (trace) {
TRACE_BYTES.trace("Sending: \n{}", command);
}
@ -167,4 +168,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void setTransformer(String transformer) {
this.transformer = transformer;
}
public IAmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
}
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.Command;
import org.fusesource.hawtbuf.Buffer;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
/**
*/
public interface IAmqpProtocolConverter {
void onAMQPData(Object command) throws Exception;
void onAMQPException(IOException error);
void onActiveMQCommand(Command command) throws Exception;
void updateTracer();
}

View File

@ -25,5 +25,5 @@ import java.io.IOException;
* Interface used by the AMQPProtocolConverter for callbacks.
*/
interface ResponseHandler {
void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException;
}