Swithc to using Proton's Evet collector for processing engine state
changes.  All tests passing locally with this change.
This commit is contained in:
Timothy Bish 2014-05-12 15:23:42 -04:00
parent 1dd34a13b2
commit 38a86b470f

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -78,16 +77,19 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
@ -109,11 +111,6 @@ import org.slf4j.LoggerFactory;
class AmqpProtocolConverter implements IAmqpProtocolConverter {
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransport amqpTransport;
@ -131,6 +128,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
Transport protonTransport = engineFactory.createTransport();
Connection protonConnection = engineFactory.createConnection();
MessageFactory messageFactory = messageFactoryLoader.loadFactory();
Collector eventCollector = new CollectorImpl();
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
@ -145,6 +143,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.protonTransport.setMaxFrameSize(maxFrameSize);
this.protonTransport.bind(this.protonConnection);
this.protonConnection.collect(eventCollector);
updateTracer();
}
@ -171,14 +170,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
done = true;
}
}
// System.out.println("write done");
} catch (IOException e) {
amqpTransport.onException(e);
}
@ -208,7 +205,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
AmqpHeader header = (AmqpHeader) command;
switch (header.getProtocolId()) {
case 0:
// amqpTransport.sendToAmqp(new AmqpHeader());
break; // nothing to do..
case 3: // Client will be using SASL for auth..
sasl = protonTransport.sasl();
@ -225,7 +221,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
public void onFrame(Buffer frame) throws Exception {
// System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@ -263,54 +258,30 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
// Handle the amqp open..
if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
onConnectionOpen();
}
// Lets map amqp sessions to openwire sessions..
Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (session != null) {
onSessionOpen(session);
session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
}
Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (link != null) {
onLinkOpen(link);
link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
}
Delivery delivery = protonConnection.getWorkHead();
while (delivery != null) {
AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
if (listener != null) {
listener.onDelivery(delivery);
Event event = null;
while ((event = eventCollector.peek()) != null) {
switch (event.getType()) {
case CONNECTION_REMOTE_STATE:
processConnectionEvent(event.getConnection());
break;
case SESSION_REMOTE_STATE:
processSessionEvent(event.getSession());
break;
case LINK_REMOTE_STATE:
processLinkEvent(event.getLink());
break;
case LINK_FLOW:
Link link = event.getLink();
((AmqpDeliveryListener) link.getContext()).drainCheck();
break;
case DELIVERY:
processDelivery(event.getDelivery());
break;
default:
break;
}
delivery = delivery.getWorkNext();
}
link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
while (link != null) {
((AmqpDeliveryListener) link.getContext()).onClose();
link.close();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
while (link != null) {
((AmqpDeliveryListener) link.getContext()).drainCheck();
link = link.next(ACTIVE_STATE, ALL_STATES);
}
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
// TODO - close links?
onSessionClose(session);
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
doClose();
eventCollector.pop();
}
} catch (Throwable e) {
@ -321,6 +292,44 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
protected void processConnectionEvent(Connection connection) throws Exception {
EndpointState remoteState = connection.getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
onConnectionOpen();
} else if (remoteState == EndpointState.CLOSED) {
doClose();
}
}
protected void processLinkEvent(Link link) throws Exception {
EndpointState remoteState = link.getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
onLinkOpen(link);
} else if (remoteState == EndpointState.CLOSED) {
((AmqpDeliveryListener) link.getContext()).onClose();
link.close();
}
}
protected void processSessionEvent(Session session) throws Exception {
EndpointState remoteState = session.getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
onSessionOpen(session);
} else if (remoteState == EndpointState.CLOSED) {
// TODO - close links?
onSessionClose(session);
}
}
protected void processDelivery(Delivery delivery) throws Exception {
if (!delivery.isPartial()) {
AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
if (listener != null) {
listener.onDelivery(delivery);
}
}
}
boolean closing = false;
boolean closedSocket = false;