diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 4265f28c5b..29a4df3f55 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -22,12 +22,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; @@ -35,7 +32,6 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -51,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -72,8 +67,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { protected AMQPConnectionContext amqpConnection; - private final ReusableLatch latch = new ReusableLatch(0); - private final Executor closeExecutor; private String remoteContainerId; @@ -160,25 +153,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { } public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) { - final int size = byteBuf.writerIndex(); - - latch.countUp(); - connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - latch.countDown(); - } - }); - - if (amqpConnection.isSyncOnFlush()) { - try { - latch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - ActiveMQServerLogger.LOGGER.warn("Error during await invocation", e); - } - } - - amqpConnection.outputDone(size); + connection.write(new ChannelBufferWrapper(byteBuf, true)); } public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 03314b24de..a6463fa2cb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -109,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } String id = server.getConfiguration().getName(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java index 510fdad73b..441f3a68a6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java @@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory { Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 7994be4b22..a884f0da39 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -16,22 +16,16 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; - import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -55,9 +49,13 @@ import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; -public class AMQPConnectionContext extends ProtonInitializable { +public class AMQPConnectionContext extends ProtonInitializable implements EventHandler { private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); @@ -73,8 +71,6 @@ public class AMQPConnectionContext extends ProtonInitializable { private final Map sessions = new ConcurrentHashMap<>(); - protected LocalListener listener = new LocalListener(); - private final ProtonProtocolManager protocolManager; public AMQPConnectionContext(ProtonProtocolManager protocolManager, @@ -83,7 +79,6 @@ public class AMQPConnectionContext extends ProtonInitializable { int idleTimeout, int maxFrameSize, int channelMax, - Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { this.protocolManager = protocolManager; @@ -95,7 +90,8 @@ public class AMQPConnectionContext extends ProtonInitializable { this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); - this.handler = new ProtonHandler(dispatchExecutor); + this.handler = new ProtonHandler(); + handler.addEventHandler(this); Transport transport = handler.getTransport(); transport.setEmitFlowEventOnSend(false); if (idleTimeout > 0) { @@ -103,7 +99,6 @@ public class AMQPConnectionContext extends ProtonInitializable { } transport.setChannelMax(channelMax); transport.setMaxFrameSize(maxFrameSize); - handler.addEventHandler(listener); } protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { @@ -141,10 +136,6 @@ public class AMQPConnectionContext extends ProtonInitializable { return handler.capacity(); } - public void outputDone(int bytes) { - handler.outputDone(bytes); - } - public void flush() { handler.flush(); } @@ -176,14 +167,6 @@ public class AMQPConnectionContext extends ProtonInitializable { return handler.getCreationTime(); } - protected void flushBytes() { - ByteBuf bytes; - // handler.outputBuffer has the lock - while ((bytes = handler.outputBuffer()) != null) { - connectionCallback.onTransport(bytes, this); - } - } - public String getRemoteContainer() { return handler.getConnection().getRemoteContainer(); } @@ -218,7 +201,7 @@ public class AMQPConnectionContext extends ProtonInitializable { public Symbol[] getConnectionCapabilitiesOffered() { URI tc = connectionCallback.getFailoverList(); if (tc != null) { - Map hostDetails = new HashMap<>(); + Map hostDetails = new HashMap<>(); hostDetails.put(NETWORK_HOST, tc.getHost()); boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true"); if (isSSL) { @@ -229,7 +212,7 @@ public class AMQPConnectionContext extends ProtonInitializable { hostDetails.put(HOSTNAME, tc.getHost()); hostDetails.put(PORT, tc.getPort()); - connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails)); + connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails)); } return ExtCapability.getCapabilities(); } @@ -268,220 +251,218 @@ public class AMQPConnectionContext extends ProtonInitializable { } } - // This listener will perform a bunch of things here - class LocalListener implements EventHandler { + @Override + public void onInit(Connection connection) throws Exception { - @Override - public void onInit(Connection connection) throws Exception { + } + @Override + public void onLocalOpen(Connection connection) throws Exception { + + } + + @Override + public void onLocalClose(Connection connection) throws Exception { + + } + + @Override + public void onFinal(Connection connection) throws Exception { + + } + + @Override + public void onInit(Session session) throws Exception { + + } + + @Override + public void onFinal(Session session) throws Exception { + + } + + @Override + public void onInit(Link link) throws Exception { + + } + + @Override + public void onLocalOpen(Link link) throws Exception { + + } + + @Override + public void onLocalClose(Link link) throws Exception { + + } + + @Override + public void onFinal(Link link) throws Exception { + + } + + @Override + public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { + if (sasl) { + handler.createServerSASL(connectionCallback.getSASLMechnisms()); + } else { + if (!connectionCallback.isSupportsAnonymous()) { + connectionCallback.sendSASLSupported(); + connectionCallback.close(); + handler.close(null); + } } + } - @Override - public void onLocalOpen(Connection connection) throws Exception { + @Override + public void onTransport(Transport transport) { + handler.flushBytes(); + } - } - @Override - public void onLocalClose(Connection connection) throws Exception { + @Override + public void pushBytes(ByteBuf bytes) { + connectionCallback.onTransport(bytes, this); + } - } - - @Override - public void onFinal(Connection connection) throws Exception { - - } - - @Override - public void onInit(Session session) throws Exception { - - } - - @Override - public void onFinal(Session session) throws Exception { - - } - - @Override - public void onInit(Link link) throws Exception { - - } - - @Override - public void onLocalOpen(Link link) throws Exception { - - } - - @Override - public void onLocalClose(Link link) throws Exception { - - } - - @Override - public void onFinal(Link link) throws Exception { - - } - - @Override - public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { - if (sasl) { - handler.createServerSASL(connectionCallback.getSASLMechnisms()); + @Override + public void onRemoteOpen(Connection connection) throws Exception { + synchronized (getLock()) { + try { + initInternal(); + } catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); } else { - if (!connectionCallback.isSupportsAnonymous()) { - connectionCallback.sendSASLSupported(); - connectionCallback.close(); - handler.close(null); - } + connection.setContext(AMQPConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); } } - - @Override - public void onTransport(Transport transport) { - flushBytes(); - } - - @Override - public void onRemoteOpen(Connection connection) throws Exception { - synchronized (getLock()) { - try { - initInternal(); - } catch (Exception e) { - log.error("Error init connection", e); - } - if (!validateConnection(connection)) { - connection.close(); - } else { - connection.setContext(AMQPConnectionContext.this); - connection.setContainer(containerId); - connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); - connection.open(); - } - } - initialise(); + initialise(); /* * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections * but its here in case we add support for outbound connections. * */ - if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { - long nextKeepAliveTime = handler.tick(true); - flushBytes(); - if (nextKeepAliveTime > 0 && scheduledPool != null) { - scheduledPool.schedule(new Runnable() { - @Override - public void run() { - long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); - flushBytes(); - if (rescheduleAt > 0) { - scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); - } + if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + long nextKeepAliveTime = handler.tick(true); + if (nextKeepAliveTime > 0 && scheduledPool != null) { + scheduledPool.schedule(new Runnable() { + @Override + public void run() { + long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + if (rescheduleAt > 0) { + scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); } - }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); - } - } - } - - @Override - public void onRemoteClose(Connection connection) { - synchronized (getLock()) { - connection.close(); - connection.free(); - } - - for (AMQPSessionContext protonSession : sessions.values()) { - protonSession.close(); - } - sessions.clear(); - - // We must force write the channel before we actually destroy the connection - onTransport(handler.getTransport()); - destroy(); - } - - @Override - public void onLocalOpen(Session session) throws Exception { - getSessionExtension(session); - } - - @Override - public void onRemoteOpen(Session session) throws Exception { - getSessionExtension(session).initialise(); - synchronized (getLock()) { - session.open(); - } - } - - @Override - public void onLocalClose(Session session) throws Exception { - } - - @Override - public void onRemoteClose(Session session) throws Exception { - synchronized (getLock()) { - session.close(); - session.free(); - } - - AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); - if (sessionContext != null) { - sessionContext.close(); - sessions.remove(session); - session.setContext(null); - } - } - - @Override - public void onRemoteOpen(Link link) throws Exception { - remoteLinkOpened(link); - } - - @Override - public void onFlow(Link link) throws Exception { - if (link.getContext() != null) { - ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain()); - } - } - - @Override - public void onRemoteClose(Link link) throws Exception { - synchronized (getLock()) { - link.close(); - link.free(); - } - ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); - if (linkContext != null) { - linkContext.close(true); - } - } - - @Override - public void onRemoteDetach(Link link) throws Exception { - synchronized (getLock()) { - link.detach(); - link.free(); - } - - flush(); - } - - @Override - public void onLocalDetach(Link link) throws Exception { - Object context = link.getContext(); - if (context instanceof ProtonServerSenderContext) { - ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; - senderContext.close(false); - } - } - - @Override - public void onDelivery(Delivery delivery) throws Exception { - ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); - if (handler != null) { - handler.onMessage(delivery); - } else { - // TODO: logs - System.err.println("Handler is null, can't delivery " + delivery); + } + }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); } } } + + @Override + public void onRemoteClose(Connection connection) { + synchronized (getLock()) { + connection.close(); + connection.free(); + } + + for (AMQPSessionContext protonSession : sessions.values()) { + protonSession.close(); + } + sessions.clear(); + + // We must force write the channel before we actually destroy the connection + handler.flushBytes(); + destroy(); + } + + @Override + public void onLocalOpen(Session session) throws Exception { + getSessionExtension(session); + } + + @Override + public void onRemoteOpen(Session session) throws Exception { + getSessionExtension(session).initialise(); + synchronized (getLock()) { + session.open(); + } + } + + @Override + public void onLocalClose(Session session) throws Exception { + } + + @Override + public void onRemoteClose(Session session) throws Exception { + synchronized (getLock()) { + session.close(); + session.free(); + } + + AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); + if (sessionContext != null) { + sessionContext.close(); + sessions.remove(session); + session.setContext(null); + } + } + + @Override + public void onRemoteOpen(Link link) throws Exception { + remoteLinkOpened(link); + } + + @Override + public void onFlow(Link link) throws Exception { + if (link.getContext() != null) { + ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain()); + } + } + + @Override + public void onRemoteClose(Link link) throws Exception { + synchronized (getLock()) { + link.close(); + link.free(); + } + ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); + if (linkContext != null) { + linkContext.close(true); + } + } + + @Override + public void onRemoteDetach(Link link) throws Exception { + synchronized (getLock()) { + link.detach(); + link.free(); + } + + } + + @Override + public void onLocalDetach(Link link) throws Exception { + Object context = link.getContext(); + if (context instanceof ProtonServerSenderContext) { + ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; + senderContext.close(false); + } + } + + @Override + public void onDelivery(Delivery delivery) throws Exception { + ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); + if (handler != null) { + handler.onMessage(delivery); + } else { + log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location")); + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java index 00bd27a100..0ed17233fd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.handler; +import io.netty.buffer.ByteBuf; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; @@ -75,4 +76,6 @@ public interface EventHandler { void onTransport(Transport transport) throws Exception; + void pushBytes(ByteBuf bytes); + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java index 405491af6e..a4d146396c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java @@ -17,14 +17,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler; import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.Transport; public final class Events { - public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception { - handler.onTransport(transport); - } - public static void dispatch(Event event, EventHandler handler) throws Exception { switch (event.getType()) { case CONNECTION_INIT: diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index b5594faee1..91b252b33f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; @@ -54,16 +54,10 @@ public class ProtonHandler extends ProtonInitializable { private final Collector collector = Proton.collector(); - private final Executor dispatchExecutor; - - private final Runnable dispatchRunnable = () -> dispatch(); - - private ArrayList handlers = new ArrayList<>(); + private List handlers = new ArrayList<>(); private Sasl serverSasl; - private Sasl clientSasl; - private final Object lock = new Object(); private final long creationTime; @@ -76,33 +70,37 @@ public class ProtonHandler extends ProtonInitializable { protected boolean receivedFirstPacket = false; - private int offset = 0; + boolean inDispatch = false; - public ProtonHandler(Executor dispatchExecutor) { - this.dispatchExecutor = dispatchExecutor; + public ProtonHandler() { this.creationTime = System.currentTimeMillis(); transport.bind(connection); connection.collect(collector); } public long tick(boolean firstTick) { - synchronized (lock) { - if (!firstTick) { - try { - if (connection.getLocalState() != EndpointState.CLOSED) { - long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); - if (transport.isClosed()) { - throw new IllegalStateException("Channel was inactive for to long"); + try { + synchronized (lock) { + if (!firstTick) { + try { + if (connection.getLocalState() != EndpointState.CLOSED) { + long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + if (transport.isClosed()) { + throw new IllegalStateException("Channel was inactive for to long"); + } + return rescheduleAt; } - return rescheduleAt; + } catch (Exception e) { + log.warn(e.getMessage(), e); + transport.close(); + connection.setCondition(new ErrorCondition()); } - } catch (Exception e) { - transport.close(); - connection.setCondition(new ErrorCondition()); + return 0; } - return 0; + return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); } - return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + } finally { + flushBytes(); } } @@ -143,6 +141,30 @@ public class ProtonHandler extends ProtonInitializable { } + public void flushBytes() { + synchronized (lock) { + while (true) { + int pending = transport.pending(); + + if (pending <= 0) { + break; + } + + // We allocated a Pooled Direct Buffer, that will be sent down the stream + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending); + ByteBuffer head = transport.head(); + buffer.writeBytes(head); + + for (EventHandler handler : handlers) { + handler.pushBytes(buffer); + } + + transport.pop(pending); + } + } + } + + public SASLResult getSASLResult() { return saslResult; } @@ -201,57 +223,13 @@ public class ProtonHandler extends ProtonInitializable { return creationTime; } - public void outputDone(int bytes) { - synchronized (lock) { - transport.pop(bytes); - offset -= bytes; - - if (offset < 0) { - throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes + - ", outcome result=" + offset); - } - } - - flush(); - } - - public ByteBuf outputBuffer() { - - synchronized (lock) { - int pending = transport.pending(); - - if (pending < 0) { - return null;//throw new IllegalStateException("xxx need to close the connection"); - } - - int size = pending - offset; - - if (size < 0) { - throw new IllegalStateException("negative size: " + pending); - } - - if (size == 0) { - return null; - } - - // For returning PooledBytes - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); - ByteBuffer head = transport.head(); - head.position(offset); - head.limit(offset + size); - buffer.writeBytes(head); - offset += size; // incrementing offset for future calls - return buffer; - } - } - public void flush() { synchronized (lock) { transport.process(); checkServerSASL(); } - dispatchExecutor.execute(dispatchRunnable); + dispatch(); } public void close(ErrorCondition errorCondition) { @@ -304,38 +282,36 @@ public class ProtonHandler extends ProtonInitializable { private void dispatch() { Event ev; - // We don't hold a lock on the entire event processing - // because we could have a distributed deadlock - // while processing events (for instance onTransport) - // while a client is also trying to write here synchronized (lock) { - while ((ev = collector.peek()) != null) { - for (EventHandler h : handlers) { - if (log.isTraceEnabled()) { - log.trace("Handling " + ev + " towards " + h); - } - try { - Events.dispatch(ev, h); - } catch (Exception e) { - log.warn(e.getMessage(), e); - connection.setCondition(new ErrorCondition()); + if (inDispatch) { + // Avoid recursion from events + return; + } + try { + inDispatch = true; + while ((ev = collector.peek()) != null) { + for (EventHandler h : handlers) { + if (log.isTraceEnabled()) { + log.trace("Handling " + ev + " towards " + h); + } + try { + Events.dispatch(ev, h); + } catch (Exception e) { + log.warn(e.getMessage(), e); + connection.setCondition(new ErrorCondition()); + } } + + collector.pop(); } - collector.pop(); - } - } - - for (EventHandler h : handlers) { - try { - h.onTransport(transport); - } catch (Exception e) { - log.warn(e.getMessage(), e); - connection.setCondition(new ErrorCondition()); + } finally { + inDispatch = false; } } + flushBytes(); } public void open(String containerId, Map connectionProperties) {