diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 952bf12884..cdaf34fd84 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -100,7 +100,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory(). - createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX); + createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java index b1ef1855dd..cafe8f1a21 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java @@ -16,6 +16,8 @@ */ package org.proton.plug; +import java.util.concurrent.ScheduledExecutorService; + public abstract class AMQPConnectionContextFactory { /** @@ -24,10 +26,11 @@ public abstract class AMQPConnectionContextFactory { public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, - int channelMax); + int channelMax, + ScheduledExecutorService scheduledPool); /** * @return */ - public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback); + public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 023ee4fb48..9486a1b5f2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -18,8 +18,11 @@ package org.proton.plug.context; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; @@ -40,28 +43,32 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext { + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); + protected ProtonHandler handler = ProtonHandler.Factory.create(); protected AMQPConnectionCallback connectionCallback; + private final ScheduledExecutorService scheduledPool; private final Map sessions = new ConcurrentHashMap<>(); protected LocalListener listener = new LocalListener(); - public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) { - this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX); + public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { + this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool); } public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, - int channelMax) { + int channelMax, + ScheduledExecutorService scheduledPool) { this.connectionCallback = connectionCallback; + this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); Transport transport = handler.getTransport(); if (idleTimeout > 0) { transport.setIdleTimeout(idleTimeout); - transport.tick(idleTimeout / 2); } transport.setChannelMax(channelMax); transport.setMaxFrameSize(maxFrameSize); @@ -172,6 +179,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl connection.open(); } initialise(); + if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + long nextKeepAliveTime = handler.tick(true); + flushBytes(); + if (nextKeepAliveTime > 0) { + scheduledPool.schedule(new Runnable() { + @Override + public void run() { + long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + flushBytes(); + if (rescheduleAt > 0) { + scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); + } + } + }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); + } + } } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java index 63e9250044..531b182619 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java @@ -29,17 +29,20 @@ import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.context.ProtonInitializable; import org.proton.plug.util.FutureRunnable; +import java.util.concurrent.ScheduledExecutorService; + public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext { - public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback) { - super(connectionCallback); + public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { + super(connectionCallback, scheduledPool); } public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, - int channelMax) { - super(connectionCallback, idleTimeout, maxFrameSize, channelMax); + int channelMax, + ScheduledExecutorService scheduledPool) { + super(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); } // Maybe a client interface? diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java index 8bc54c5545..2beb95c841 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java @@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionContextFactory; import org.proton.plug.AMQPConnectionCallback; +import java.util.concurrent.ScheduledExecutorService; + public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory { private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory(); @@ -29,15 +31,17 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF } @Override - public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback) { - return new ProtonClientConnectionContext(connectionCallback); + public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { + return new ProtonClientConnectionContext(connectionCallback, scheduledPool); } + @Override public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, - int channelMax) { - return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax); + int channelMax, + ScheduledExecutorService scheduledPool) { + return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index e4e554de86..606a3a30df 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -28,17 +28,20 @@ import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.exceptions.ActiveMQAMQPException; +import java.util.concurrent.ScheduledExecutorService; + public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext { - public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP) { - super(connectionSP); + public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, ScheduledExecutorService scheduledPool) { + super(connectionSP, scheduledPool); } public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, int idleTimeout, int maxFrameSize, - int channelMax) { - super(connectionSP, idleTimeout, maxFrameSize, channelMax); + int channelMax, + ScheduledExecutorService scheduledPool) { + super(connectionSP, idleTimeout, maxFrameSize, channelMax, scheduledPool); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java index 0c5c95fe5f..893c47981c 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java @@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContextFactory; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPServerConnectionContext; +import java.util.concurrent.ScheduledExecutorService; + import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; @@ -33,15 +35,16 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF } @Override - public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback) { - return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX); + public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { + return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool); } @Override public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, - int channelMax) { - return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax); + int channelMax, + ScheduledExecutorService scheduledPool) { + return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java index 366663c260..1ae0dffd4c 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java @@ -29,6 +29,8 @@ import org.proton.plug.handler.impl.ProtonHandlerImpl; */ public interface ProtonHandler { + long tick(boolean firstTick); + public static final class Factory { public static ProtonHandler create() { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java index 1fac0dde3e..cd5d1572e3 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -27,6 +28,7 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; @@ -45,6 +47,7 @@ import org.proton.plug.util.DebugInfo; */ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler { + private final Transport transport = Proton.transport(); private final Connection connection = Proton.connection(); @@ -82,6 +85,27 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand connection.collect(collector); } + @Override + public long tick(boolean firstTick) { + if (!firstTick) { + try { + if (connection.getLocalState() != EndpointState.CLOSED) { + long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + if (transport.isClosed()) { + throw new IllegalStateException("Channel was inactive for to long"); + } + return rescheduleAt; + } + } + catch (Exception e) { + transport.close(); + connection.setCondition(new ErrorCondition()); + } + return 0; + } + return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + } + @Override public int capacity() { synchronized (lock) { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java index 10be09f92c..46d7c6418f 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java @@ -48,7 +48,7 @@ public class AbstractConnectionContextTest { private class TestConnectionContext extends AbstractConnectionContext { public TestConnectionContext(AMQPConnectionCallback connectionCallback) { - super(connectionCallback); + super(connectionCallback, null); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java index 3085b40d24..6a84a9565c 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java @@ -32,6 +32,6 @@ public class InVMTestConnector implements Connector { @Override public AMQPClientConnectionContext connect(String host, int port) throws Exception { - return new ProtonClientConnectionContext(new ProtonINVMSPI()); + return new ProtonClientConnectionContext(new ProtonINVMSPI(), null); } } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java index aff0fa1e82..68a27893cc 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -35,7 +35,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { AMQPConnectionContext returningConnection; - ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI()); + ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), null); final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(); diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java index bb7d0d46b4..1e12410340 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java @@ -59,7 +59,7 @@ public class SimpleAMQPConnector implements Connector { AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel()); - final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI); + final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, null); future.channel().pipeline().addLast(new ChannelDuplexHandler() { @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java index 1ed3474522..a1b1462f5c 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java @@ -124,7 +124,7 @@ public class MinimalServer { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel())); + connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), null); //ctx.read(); }