ARTEMIS-143 - expose idle-timeout

https://issues.apache.org/jira/browse/ARTEMIS-143

Set the idle-timeout on the proton Transport object and also call tick so we send our own heartbeats.
This commit is contained in:
Andy Taylor 2015-06-22 10:10:08 +01:00
parent 7c873a065e
commit 0e2779ceee
8 changed files with 101 additions and 6 deletions

View File

@ -40,6 +40,9 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.proton.plug.AMQPServerConnectionContext; import org.proton.plug.AMQPServerConnectionContext;
import org.proton.plug.context.server.ProtonServerConnectionContextFactory; import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
/** /**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/ */
@ -93,8 +96,17 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
{ {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection); ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback); if (server.getConfiguration().getConnectionTTLOverride() != -1)
{
ttl = server.getConfiguration().getConnectionTTLOverride();
}
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(
connectionCallback,
(int) ttl,
DEFAULT_MAX_FRAME_SIZE,
DEFAULT_CHANNEL_MAX);
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();
@ -103,7 +115,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
connectionCallback.setProtonConnectionDelegate(delegate); connectionCallback.setProtonConnectionDelegate(delegate);
ConnectionEntry entry = new ConnectionEntry(delegate, executor, ConnectionEntry entry = new ConnectionEntry(delegate, executor,
System.currentTimeMillis(), ActiveMQClient.DEFAULT_CONNECTION_TTL); System.currentTimeMillis(), ttl);
return entry; return entry;
} }

View File

@ -18,6 +18,11 @@ package org.proton.plug;
public abstract class AMQPConnectionContextFactory public abstract class AMQPConnectionContextFactory
{ {
/**
* @return
*/
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,int idleTimeout, int maxFrameSize, int channelMax);
/** /**
* @return * @return
*/ */

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.proton.plug.context;
/**
* Constants derived from the AMQP spec
*/
public class AMQPConstants
{
/*
* Connection Properties
* http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.2.7.1
* */
public static class Connection
{
public static final int DEFAULT_IDLE_TIMEOUT = -1;
public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l;
public static final int DEFAULT_CHANNEL_MAX = 65535;
}
}

View File

@ -34,10 +34,13 @@ import org.proton.plug.handler.impl.DefaultEventHandler;
import org.proton.plug.util.ByteUtil; import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo; import org.proton.plug.util.DebugInfo;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext
{ {
protected ProtonHandler handler = ProtonHandler.Factory.create(); protected ProtonHandler handler = ProtonHandler.Factory.create();
protected AMQPConnectionCallback connectionCallback; protected AMQPConnectionCallback connectionCallback;
@ -47,9 +50,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
protected LocalListener listener = new LocalListener(); protected LocalListener listener = new LocalListener();
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) public AbstractConnectionContext(AMQPConnectionCallback connectionCallback)
{
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
}
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{ {
this.connectionCallback = connectionCallback; this.connectionCallback = connectionCallback;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
Transport transport = handler.getTransport();
if (idleTimeout > 0)
{
transport.setIdleTimeout(idleTimeout);
transport.tick(idleTimeout / 2);
}
transport.setChannelMax(channelMax);
transport.setMaxFrameSize(maxFrameSize);
handler.addEventHandler(listener); handler.addEventHandler(listener);
} }

View File

@ -36,6 +36,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
super(connectionCallback); super(connectionCallback);
} }
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
super(connectionCallback, idleTimeout, maxFrameSize, channelMax);
}
// Maybe a client interface? // Maybe a client interface?
public void clientOpen(ClientSASL sasl) throws Exception public void clientOpen(ClientSASL sasl) throws Exception
{ {

View File

@ -34,5 +34,9 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
return new ProtonClientConnectionContext(connectionCallback); return new ProtonClientConnectionContext(connectionCallback);
} }
@Override
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax);
}
} }

View File

@ -35,6 +35,11 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
super(connectionSP); super(connectionSP);
} }
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, int idleTimeout, int maxFrameSize, int channelMax)
{
super(connectionSP, idleTimeout, maxFrameSize, channelMax);
}
protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException
{ {
AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);

View File

@ -20,6 +20,10 @@ import org.proton.plug.AMQPConnectionContextFactory;
import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPServerConnectionContext; import org.proton.plug.AMQPServerConnectionContext;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory
{ {
private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory(); private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory();
@ -31,7 +35,15 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback) public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
{ {
ProtonServerConnectionContext connection = new ProtonServerConnectionContext(connectionCallback); return createConnection(connectionCallback,DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
return connection; }
@Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax)
{
return new ProtonServerConnectionContext(connectionCallback,
idleTimeout,
maxFrameSize,
channelMax);
} }
} }