diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 24368a0fe7..f50d3e4c5b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -40,6 +40,9 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.proton.plug.AMQPServerConnectionContext; import org.proton.plug.context.server.ProtonServerConnectionContextFactory; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; + /** * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources */ @@ -93,8 +96,17 @@ public class ProtonProtocolManager implements ProtocolManager, Noti public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection); + long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL; - AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback); + if (server.getConfiguration().getConnectionTTLOverride() != -1) + { + ttl = server.getConfiguration().getConnectionTTLOverride(); + } + AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection( + connectionCallback, + (int) ttl, + DEFAULT_MAX_FRAME_SIZE, + DEFAULT_CHANNEL_MAX); Executor executor = server.getExecutorFactory().getExecutor(); @@ -103,7 +115,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti connectionCallback.setProtonConnectionDelegate(delegate); ConnectionEntry entry = new ConnectionEntry(delegate, executor, - System.currentTimeMillis(), ActiveMQClient.DEFAULT_CONNECTION_TTL); + System.currentTimeMillis(), ttl); return entry; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java index 3424e1de34..46aa0ddf5a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java @@ -18,6 +18,11 @@ package org.proton.plug; public abstract class AMQPConnectionContextFactory { + /** + * @return + */ + public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,int idleTimeout, int maxFrameSize, int channelMax); + /** * @return */ diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java new file mode 100644 index 0000000000..485431560c --- /dev/null +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.proton.plug.context; + +/** + * Constants derived from the AMQP spec + */ +public class AMQPConstants +{ + /* + * Connection Properties + * http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.2.7.1 + * */ + public static class Connection + { + public static final int DEFAULT_IDLE_TIMEOUT = -1; + + public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l; + + public static final int DEFAULT_CHANNEL_MAX = 65535; + } +} diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 051d8fe337..72858fb147 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -34,10 +34,13 @@ import org.proton.plug.handler.impl.DefaultEventHandler; import org.proton.plug.util.ByteUtil; import org.proton.plug.util.DebugInfo; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; + public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext { - protected ProtonHandler handler = ProtonHandler.Factory.create(); protected AMQPConnectionCallback connectionCallback; @@ -47,9 +50,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl protected LocalListener listener = new LocalListener(); public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) + { + this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX); + } + + public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax) { this.connectionCallback = connectionCallback; connectionCallback.setConnection(this); + Transport transport = handler.getTransport(); + if (idleTimeout > 0) + { + transport.setIdleTimeout(idleTimeout); + transport.tick(idleTimeout / 2); + } + transport.setChannelMax(channelMax); + transport.setMaxFrameSize(maxFrameSize); handler.addEventHandler(listener); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java index 948bfbb6dc..8dce7ec081 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java @@ -36,6 +36,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp super(connectionCallback); } + public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax) + { + super(connectionCallback, idleTimeout, maxFrameSize, channelMax); + } + // Maybe a client interface? public void clientOpen(ClientSASL sasl) throws Exception { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java index 66eed6f53a..ed4379cbe8 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java @@ -34,5 +34,9 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF return new ProtonClientConnectionContext(connectionCallback); } - + @Override + public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax) + { + return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax); + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index c19777d363..77cb170644 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -35,6 +35,11 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp super(connectionSP); } + public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, int idleTimeout, int maxFrameSize, int channelMax) + { + super(connectionSP, idleTimeout, maxFrameSize, channelMax); + } + protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java index 202855784b..3d538572d1 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java @@ -20,6 +20,10 @@ import org.proton.plug.AMQPConnectionContextFactory; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPServerConnectionContext; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; + public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory { private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory(); @@ -31,7 +35,15 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback) { - ProtonServerConnectionContext connection = new ProtonServerConnectionContext(connectionCallback); - return connection; + return createConnection(connectionCallback,DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX); + } + + @Override + public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax) + { + return new ProtonServerConnectionContext(connectionCallback, + idleTimeout, + maxFrameSize, + channelMax); } }