This closes #40 Idle timeout on Proton

This commit is contained in:
Clebert Suconic 2015-06-22 09:00:41 -04:00
commit 517ca68cb0
8 changed files with 101 additions and 6 deletions

View File

@ -40,6 +40,9 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.proton.plug.AMQPServerConnectionContext;
import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/
@ -93,8 +96,17 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
{
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback);
if (server.getConfiguration().getConnectionTTLOverride() != -1)
{
ttl = server.getConfiguration().getConnectionTTLOverride();
}
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(
connectionCallback,
(int) ttl,
DEFAULT_MAX_FRAME_SIZE,
DEFAULT_CHANNEL_MAX);
Executor executor = server.getExecutorFactory().getExecutor();
@ -103,7 +115,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
connectionCallback.setProtonConnectionDelegate(delegate);
ConnectionEntry entry = new ConnectionEntry(delegate, executor,
System.currentTimeMillis(), ActiveMQClient.DEFAULT_CONNECTION_TTL);
System.currentTimeMillis(), ttl);
return entry;
}

View File

@ -18,6 +18,11 @@ package org.proton.plug;
public abstract class AMQPConnectionContextFactory
{
/**
* @return
*/
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,int idleTimeout, int maxFrameSize, int channelMax);
/**
* @return
*/

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.proton.plug.context;
/**
* Constants derived from the AMQP spec
*/
public class AMQPConstants
{
/*
* Connection Properties
* http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.2.7.1
* */
public static class Connection
{
public static final int DEFAULT_IDLE_TIMEOUT = -1;
public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l;
public static final int DEFAULT_CHANNEL_MAX = 65535;
}
}

View File

@ -34,10 +34,13 @@ import org.proton.plug.handler.impl.DefaultEventHandler;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext
{
protected ProtonHandler handler = ProtonHandler.Factory.create();
protected AMQPConnectionCallback connectionCallback;
@ -47,9 +50,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
protected LocalListener listener = new LocalListener();
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback)
{
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
}
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
this.connectionCallback = connectionCallback;
connectionCallback.setConnection(this);
Transport transport = handler.getTransport();
if (idleTimeout > 0)
{
transport.setIdleTimeout(idleTimeout);
transport.tick(idleTimeout / 2);
}
transport.setChannelMax(channelMax);
transport.setMaxFrameSize(maxFrameSize);
handler.addEventHandler(listener);
}

View File

@ -36,6 +36,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
super(connectionCallback);
}
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
super(connectionCallback, idleTimeout, maxFrameSize, channelMax);
}
// Maybe a client interface?
public void clientOpen(ClientSASL sasl) throws Exception
{

View File

@ -34,5 +34,9 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
return new ProtonClientConnectionContext(connectionCallback);
}
@Override
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax);
}
}

View File

@ -35,6 +35,11 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
super(connectionSP);
}
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, int idleTimeout, int maxFrameSize, int channelMax)
{
super(connectionSP, idleTimeout, maxFrameSize, channelMax);
}
protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException
{
AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);

View File

@ -20,6 +20,10 @@ import org.proton.plug.AMQPConnectionContextFactory;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPServerConnectionContext;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory
{
private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory();
@ -31,7 +35,15 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
{
ProtonServerConnectionContext connection = new ProtonServerConnectionContext(connectionCallback);
return connection;
return createConnection(connectionCallback,DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
}
@Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
return new ProtonServerConnectionContext(connectionCallback,
idleTimeout,
maxFrameSize,
channelMax);
}
}