This closes #1141
This commit is contained in:
commit
bf7a6ef10b
|
@ -22,12 +22,9 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
|
@ -35,7 +32,6 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
|||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -51,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
|
|||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
|
@ -72,8 +67,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
|
||||
protected AMQPConnectionContext amqpConnection;
|
||||
|
||||
private final ReusableLatch latch = new ReusableLatch(0);
|
||||
|
||||
private final Executor closeExecutor;
|
||||
|
||||
private String remoteContainerId;
|
||||
|
@ -160,25 +153,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
}
|
||||
|
||||
public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
|
||||
final int size = byteBuf.writerIndex();
|
||||
|
||||
latch.countUp();
|
||||
connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
if (amqpConnection.isSyncOnFlush()) {
|
||||
try {
|
||||
latch.await(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
ActiveMQServerLogger.LOGGER.warn("Error during await invocation", e);
|
||||
}
|
||||
}
|
||||
|
||||
amqpConnection.outputDone(size);
|
||||
connection.write(new ChannelBufferWrapper(byteBuf, true));
|
||||
}
|
||||
|
||||
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
|
||||
|
|
|
@ -109,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
|
|||
}
|
||||
|
||||
String id = server.getConfiguration().getName();
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
|
||||
|
||||
Executor executor = server.getExecutorFactory().getExecutor();
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
|
|||
|
||||
Executor executor = server.getExecutorFactory().getExecutor();
|
||||
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
|
||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
|
||||
eventHandler.ifPresent(amqpConnection::addEventHandler);
|
||||
|
||||
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
|
||||
|
|
|
@ -16,22 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||
|
@ -55,9 +49,13 @@ import org.apache.qpid.proton.engine.Session;
|
|||
import org.apache.qpid.proton.engine.Transport;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
|
||||
|
||||
public class AMQPConnectionContext extends ProtonInitializable {
|
||||
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
|
||||
|
||||
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
|
||||
|
||||
|
@ -73,8 +71,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
|
||||
private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>();
|
||||
|
||||
protected LocalListener listener = new LocalListener();
|
||||
|
||||
private final ProtonProtocolManager protocolManager;
|
||||
|
||||
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
|
||||
|
@ -83,7 +79,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
int idleTimeout,
|
||||
int maxFrameSize,
|
||||
int channelMax,
|
||||
Executor dispatchExecutor,
|
||||
ScheduledExecutorService scheduledPool) {
|
||||
|
||||
this.protocolManager = protocolManager;
|
||||
|
@ -95,7 +90,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
|
||||
this.scheduledPool = scheduledPool;
|
||||
connectionCallback.setConnection(this);
|
||||
this.handler = new ProtonHandler(dispatchExecutor);
|
||||
this.handler = new ProtonHandler();
|
||||
handler.addEventHandler(this);
|
||||
Transport transport = handler.getTransport();
|
||||
transport.setEmitFlowEventOnSend(false);
|
||||
if (idleTimeout > 0) {
|
||||
|
@ -103,7 +99,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
}
|
||||
transport.setChannelMax(channelMax);
|
||||
transport.setMaxFrameSize(maxFrameSize);
|
||||
handler.addEventHandler(listener);
|
||||
}
|
||||
|
||||
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
|
||||
|
@ -141,10 +136,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
return handler.capacity();
|
||||
}
|
||||
|
||||
public void outputDone(int bytes) {
|
||||
handler.outputDone(bytes);
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
handler.flush();
|
||||
}
|
||||
|
@ -176,14 +167,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
return handler.getCreationTime();
|
||||
}
|
||||
|
||||
protected void flushBytes() {
|
||||
ByteBuf bytes;
|
||||
// handler.outputBuffer has the lock
|
||||
while ((bytes = handler.outputBuffer()) != null) {
|
||||
connectionCallback.onTransport(bytes, this);
|
||||
}
|
||||
}
|
||||
|
||||
public String getRemoteContainer() {
|
||||
return handler.getConnection().getRemoteContainer();
|
||||
}
|
||||
|
@ -218,7 +201,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
public Symbol[] getConnectionCapabilitiesOffered() {
|
||||
URI tc = connectionCallback.getFailoverList();
|
||||
if (tc != null) {
|
||||
Map<Symbol,Object> hostDetails = new HashMap<>();
|
||||
Map<Symbol, Object> hostDetails = new HashMap<>();
|
||||
hostDetails.put(NETWORK_HOST, tc.getHost());
|
||||
boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
|
||||
if (isSSL) {
|
||||
|
@ -229,7 +212,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
hostDetails.put(HOSTNAME, tc.getHost());
|
||||
hostDetails.put(PORT, tc.getPort());
|
||||
|
||||
connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
|
||||
connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
|
||||
}
|
||||
return ExtCapability.getCapabilities();
|
||||
}
|
||||
|
@ -268,220 +251,218 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
|||
}
|
||||
}
|
||||
|
||||
// This listener will perform a bunch of things here
|
||||
class LocalListener implements EventHandler {
|
||||
@Override
|
||||
public void onInit(Connection connection) throws Exception {
|
||||
|
||||
@Override
|
||||
public void onInit(Connection connection) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Connection connection) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Connection connection) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Connection connection) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit(Session session) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Session session) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
|
||||
if (sasl) {
|
||||
handler.createServerSASL(connectionCallback.getSASLMechnisms());
|
||||
} else {
|
||||
if (!connectionCallback.isSupportsAnonymous()) {
|
||||
connectionCallback.sendSASLSupported();
|
||||
connectionCallback.close();
|
||||
handler.close(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Connection connection) throws Exception {
|
||||
@Override
|
||||
public void onTransport(Transport transport) {
|
||||
handler.flushBytes();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Connection connection) throws Exception {
|
||||
@Override
|
||||
public void pushBytes(ByteBuf bytes) {
|
||||
connectionCallback.onTransport(bytes, this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Connection connection) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit(Session session) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Session session) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinal(Link link) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
|
||||
if (sasl) {
|
||||
handler.createServerSASL(connectionCallback.getSASLMechnisms());
|
||||
@Override
|
||||
public void onRemoteOpen(Connection connection) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
try {
|
||||
initInternal();
|
||||
} catch (Exception e) {
|
||||
log.error("Error init connection", e);
|
||||
}
|
||||
if (!validateConnection(connection)) {
|
||||
connection.close();
|
||||
} else {
|
||||
if (!connectionCallback.isSupportsAnonymous()) {
|
||||
connectionCallback.sendSASLSupported();
|
||||
connectionCallback.close();
|
||||
handler.close(null);
|
||||
}
|
||||
connection.setContext(AMQPConnectionContext.this);
|
||||
connection.setContainer(containerId);
|
||||
connection.setProperties(connectionProperties);
|
||||
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
|
||||
connection.open();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTransport(Transport transport) {
|
||||
flushBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Connection connection) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
try {
|
||||
initInternal();
|
||||
} catch (Exception e) {
|
||||
log.error("Error init connection", e);
|
||||
}
|
||||
if (!validateConnection(connection)) {
|
||||
connection.close();
|
||||
} else {
|
||||
connection.setContext(AMQPConnectionContext.this);
|
||||
connection.setContainer(containerId);
|
||||
connection.setProperties(connectionProperties);
|
||||
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
|
||||
connection.open();
|
||||
}
|
||||
}
|
||||
initialise();
|
||||
initialise();
|
||||
|
||||
/*
|
||||
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
|
||||
* but its here in case we add support for outbound connections.
|
||||
* */
|
||||
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
long nextKeepAliveTime = handler.tick(true);
|
||||
flushBytes();
|
||||
if (nextKeepAliveTime > 0 && scheduledPool != null) {
|
||||
scheduledPool.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
flushBytes();
|
||||
if (rescheduleAt > 0) {
|
||||
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
long nextKeepAliveTime = handler.tick(true);
|
||||
if (nextKeepAliveTime > 0 && scheduledPool != null) {
|
||||
scheduledPool.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (rescheduleAt > 0) {
|
||||
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Connection connection) {
|
||||
synchronized (getLock()) {
|
||||
connection.close();
|
||||
connection.free();
|
||||
}
|
||||
|
||||
for (AMQPSessionContext protonSession : sessions.values()) {
|
||||
protonSession.close();
|
||||
}
|
||||
sessions.clear();
|
||||
|
||||
// We must force write the channel before we actually destroy the connection
|
||||
onTransport(handler.getTransport());
|
||||
destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Session session) throws Exception {
|
||||
getSessionExtension(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Session session) throws Exception {
|
||||
getSessionExtension(session).initialise();
|
||||
synchronized (getLock()) {
|
||||
session.open();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Session session) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Session session) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
session.close();
|
||||
session.free();
|
||||
}
|
||||
|
||||
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
|
||||
if (sessionContext != null) {
|
||||
sessionContext.close();
|
||||
sessions.remove(session);
|
||||
session.setContext(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Link link) throws Exception {
|
||||
remoteLinkOpened(link);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFlow(Link link) throws Exception {
|
||||
if (link.getContext() != null) {
|
||||
((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
link.close();
|
||||
link.free();
|
||||
}
|
||||
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
|
||||
if (linkContext != null) {
|
||||
linkContext.close(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteDetach(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
link.detach();
|
||||
link.free();
|
||||
}
|
||||
|
||||
flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalDetach(Link link) throws Exception {
|
||||
Object context = link.getContext();
|
||||
if (context instanceof ProtonServerSenderContext) {
|
||||
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
|
||||
senderContext.close(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelivery(Delivery delivery) throws Exception {
|
||||
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
|
||||
if (handler != null) {
|
||||
handler.onMessage(delivery);
|
||||
} else {
|
||||
// TODO: logs
|
||||
System.err.println("Handler is null, can't delivery " + delivery);
|
||||
}
|
||||
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Connection connection) {
|
||||
synchronized (getLock()) {
|
||||
connection.close();
|
||||
connection.free();
|
||||
}
|
||||
|
||||
for (AMQPSessionContext protonSession : sessions.values()) {
|
||||
protonSession.close();
|
||||
}
|
||||
sessions.clear();
|
||||
|
||||
// We must force write the channel before we actually destroy the connection
|
||||
handler.flushBytes();
|
||||
destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalOpen(Session session) throws Exception {
|
||||
getSessionExtension(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Session session) throws Exception {
|
||||
getSessionExtension(session).initialise();
|
||||
synchronized (getLock()) {
|
||||
session.open();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(Session session) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Session session) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
session.close();
|
||||
session.free();
|
||||
}
|
||||
|
||||
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
|
||||
if (sessionContext != null) {
|
||||
sessionContext.close();
|
||||
sessions.remove(session);
|
||||
session.setContext(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteOpen(Link link) throws Exception {
|
||||
remoteLinkOpened(link);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFlow(Link link) throws Exception {
|
||||
if (link.getContext() != null) {
|
||||
((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteClose(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
link.close();
|
||||
link.free();
|
||||
}
|
||||
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
|
||||
if (linkContext != null) {
|
||||
linkContext.close(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteDetach(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
link.detach();
|
||||
link.free();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalDetach(Link link) throws Exception {
|
||||
Object context = link.getContext();
|
||||
if (context instanceof ProtonServerSenderContext) {
|
||||
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
|
||||
senderContext.close(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelivery(Delivery delivery) throws Exception {
|
||||
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
|
||||
if (handler != null) {
|
||||
handler.onMessage(delivery);
|
||||
} else {
|
||||
log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.Link;
|
||||
|
@ -75,4 +76,6 @@ public interface EventHandler {
|
|||
|
||||
void onTransport(Transport transport) throws Exception;
|
||||
|
||||
void pushBytes(ByteBuf bytes);
|
||||
|
||||
}
|
||||
|
|
|
@ -17,14 +17,9 @@
|
|||
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
|
||||
|
||||
import org.apache.qpid.proton.engine.Event;
|
||||
import org.apache.qpid.proton.engine.Transport;
|
||||
|
||||
public final class Events {
|
||||
|
||||
public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception {
|
||||
handler.onTransport(transport);
|
||||
}
|
||||
|
||||
public static void dispatch(Event event, EventHandler handler) throws Exception {
|
||||
switch (event.getType()) {
|
||||
case CONNECTION_INIT:
|
||||
|
|
|
@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -54,16 +54,10 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
private final Collector collector = Proton.collector();
|
||||
|
||||
private final Executor dispatchExecutor;
|
||||
|
||||
private final Runnable dispatchRunnable = () -> dispatch();
|
||||
|
||||
private ArrayList<EventHandler> handlers = new ArrayList<>();
|
||||
private List<EventHandler> handlers = new ArrayList<>();
|
||||
|
||||
private Sasl serverSasl;
|
||||
|
||||
private Sasl clientSasl;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final long creationTime;
|
||||
|
@ -76,33 +70,37 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
protected boolean receivedFirstPacket = false;
|
||||
|
||||
private int offset = 0;
|
||||
boolean inDispatch = false;
|
||||
|
||||
public ProtonHandler(Executor dispatchExecutor) {
|
||||
this.dispatchExecutor = dispatchExecutor;
|
||||
public ProtonHandler() {
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
transport.bind(connection);
|
||||
connection.collect(collector);
|
||||
}
|
||||
|
||||
public long tick(boolean firstTick) {
|
||||
synchronized (lock) {
|
||||
if (!firstTick) {
|
||||
try {
|
||||
if (connection.getLocalState() != EndpointState.CLOSED) {
|
||||
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (transport.isClosed()) {
|
||||
throw new IllegalStateException("Channel was inactive for to long");
|
||||
try {
|
||||
synchronized (lock) {
|
||||
if (!firstTick) {
|
||||
try {
|
||||
if (connection.getLocalState() != EndpointState.CLOSED) {
|
||||
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (transport.isClosed()) {
|
||||
throw new IllegalStateException("Channel was inactive for to long");
|
||||
}
|
||||
return rescheduleAt;
|
||||
}
|
||||
return rescheduleAt;
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
transport.close();
|
||||
connection.setCondition(new ErrorCondition());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
transport.close();
|
||||
connection.setCondition(new ErrorCondition());
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
}
|
||||
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
} finally {
|
||||
flushBytes();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,6 +141,30 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
}
|
||||
|
||||
public void flushBytes() {
|
||||
synchronized (lock) {
|
||||
while (true) {
|
||||
int pending = transport.pending();
|
||||
|
||||
if (pending <= 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// We allocated a Pooled Direct Buffer, that will be sent down the stream
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending);
|
||||
ByteBuffer head = transport.head();
|
||||
buffer.writeBytes(head);
|
||||
|
||||
for (EventHandler handler : handlers) {
|
||||
handler.pushBytes(buffer);
|
||||
}
|
||||
|
||||
transport.pop(pending);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public SASLResult getSASLResult() {
|
||||
return saslResult;
|
||||
}
|
||||
|
@ -201,57 +223,13 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
return creationTime;
|
||||
}
|
||||
|
||||
public void outputDone(int bytes) {
|
||||
synchronized (lock) {
|
||||
transport.pop(bytes);
|
||||
offset -= bytes;
|
||||
|
||||
if (offset < 0) {
|
||||
throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
|
||||
", outcome result=" + offset);
|
||||
}
|
||||
}
|
||||
|
||||
flush();
|
||||
}
|
||||
|
||||
public ByteBuf outputBuffer() {
|
||||
|
||||
synchronized (lock) {
|
||||
int pending = transport.pending();
|
||||
|
||||
if (pending < 0) {
|
||||
return null;//throw new IllegalStateException("xxx need to close the connection");
|
||||
}
|
||||
|
||||
int size = pending - offset;
|
||||
|
||||
if (size < 0) {
|
||||
throw new IllegalStateException("negative size: " + pending);
|
||||
}
|
||||
|
||||
if (size == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// For returning PooledBytes
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
|
||||
ByteBuffer head = transport.head();
|
||||
head.position(offset);
|
||||
head.limit(offset + size);
|
||||
buffer.writeBytes(head);
|
||||
offset += size; // incrementing offset for future calls
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
synchronized (lock) {
|
||||
transport.process();
|
||||
checkServerSASL();
|
||||
}
|
||||
|
||||
dispatchExecutor.execute(dispatchRunnable);
|
||||
dispatch();
|
||||
}
|
||||
|
||||
public void close(ErrorCondition errorCondition) {
|
||||
|
@ -304,38 +282,36 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
private void dispatch() {
|
||||
Event ev;
|
||||
// We don't hold a lock on the entire event processing
|
||||
// because we could have a distributed deadlock
|
||||
// while processing events (for instance onTransport)
|
||||
// while a client is also trying to write here
|
||||
|
||||
synchronized (lock) {
|
||||
while ((ev = collector.peek()) != null) {
|
||||
for (EventHandler h : handlers) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Handling " + ev + " towards " + h);
|
||||
}
|
||||
try {
|
||||
Events.dispatch(ev, h);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
connection.setCondition(new ErrorCondition());
|
||||
if (inDispatch) {
|
||||
// Avoid recursion from events
|
||||
return;
|
||||
}
|
||||
try {
|
||||
inDispatch = true;
|
||||
while ((ev = collector.peek()) != null) {
|
||||
for (EventHandler h : handlers) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Handling " + ev + " towards " + h);
|
||||
}
|
||||
try {
|
||||
Events.dispatch(ev, h);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
connection.setCondition(new ErrorCondition());
|
||||
}
|
||||
}
|
||||
|
||||
collector.pop();
|
||||
}
|
||||
|
||||
collector.pop();
|
||||
}
|
||||
}
|
||||
|
||||
for (EventHandler h : handlers) {
|
||||
try {
|
||||
h.onTransport(transport);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
connection.setCondition(new ErrorCondition());
|
||||
} finally {
|
||||
inDispatch = false;
|
||||
}
|
||||
}
|
||||
|
||||
flushBytes();
|
||||
}
|
||||
|
||||
public void open(String containerId, Map<Symbol, Object> connectionProperties) {
|
||||
|
|
Loading…
Reference in New Issue