This closes #390

This commit is contained in:
clebert suconic 2016-02-16 11:52:36 -05:00
commit 8432a3dd8a
14 changed files with 93 additions and 28 deletions

View File

@ -100,7 +100,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -16,6 +16,8 @@
*/
package org.proton.plug;
import java.util.concurrent.ScheduledExecutorService;
public abstract class AMQPConnectionContextFactory {
/**
@ -24,10 +26,11 @@ public abstract class AMQPConnectionContextFactory {
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
int idleTimeout,
int maxFrameSize,
int channelMax);
int channelMax,
ScheduledExecutorService scheduledPool);
/**
* @return
*/
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback);
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool);
}

View File

@ -18,8 +18,11 @@ package org.proton.plug.context;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
@ -40,28 +43,32 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext {
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
protected ProtonHandler handler = ProtonHandler.Factory.create();
protected AMQPConnectionCallback connectionCallback;
private final ScheduledExecutorService scheduledPool;
private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
protected LocalListener listener = new LocalListener();
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) {
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool);
}
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
int idleTimeout,
int maxFrameSize,
int channelMax) {
int channelMax,
ScheduledExecutorService scheduledPool) {
this.connectionCallback = connectionCallback;
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
Transport transport = handler.getTransport();
if (idleTimeout > 0) {
transport.setIdleTimeout(idleTimeout);
transport.tick(idleTimeout / 2);
}
transport.setChannelMax(channelMax);
transport.setMaxFrameSize(maxFrameSize);
@ -172,6 +179,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
connection.open();
}
initialise();
if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true);
flushBytes();
if (nextKeepAliveTime > 0) {
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
flushBytes();
if (rescheduleAt > 0) {
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
}
}
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
}
}
}
@Override

View File

@ -29,17 +29,20 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.context.ProtonInitializable;
import org.proton.plug.util.FutureRunnable;
import java.util.concurrent.ScheduledExecutorService;
public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext {
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback) {
super(connectionCallback);
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
super(connectionCallback, scheduledPool);
}
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback,
int idleTimeout,
int maxFrameSize,
int channelMax) {
super(connectionCallback, idleTimeout, maxFrameSize, channelMax);
int channelMax,
ScheduledExecutorService scheduledPool) {
super(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
}
// Maybe a client interface?

View File

@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionContextFactory;
import org.proton.plug.AMQPConnectionCallback;
import java.util.concurrent.ScheduledExecutorService;
public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory {
private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory();
@ -29,15 +31,17 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
}
@Override
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback) {
return new ProtonClientConnectionContext(connectionCallback);
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
return new ProtonClientConnectionContext(connectionCallback, scheduledPool);
}
@Override
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
int idleTimeout,
int maxFrameSize,
int channelMax) {
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax);
int channelMax,
ScheduledExecutorService scheduledPool) {
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
}
}

View File

@ -28,17 +28,20 @@ import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import java.util.concurrent.ScheduledExecutorService;
public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext {
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP) {
super(connectionSP);
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, ScheduledExecutorService scheduledPool) {
super(connectionSP, scheduledPool);
}
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP,
int idleTimeout,
int maxFrameSize,
int channelMax) {
super(connectionSP, idleTimeout, maxFrameSize, channelMax);
int channelMax,
ScheduledExecutorService scheduledPool) {
super(connectionSP, idleTimeout, maxFrameSize, channelMax, scheduledPool);
}
@Override

View File

@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContextFactory;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPServerConnectionContext;
import java.util.concurrent.ScheduledExecutorService;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
@ -33,15 +35,16 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
}
@Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback) {
return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) {
return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool);
}
@Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
int idleTimeout,
int maxFrameSize,
int channelMax) {
return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax);
int channelMax,
ScheduledExecutorService scheduledPool) {
return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
}
}

View File

@ -29,6 +29,8 @@ import org.proton.plug.handler.impl.ProtonHandlerImpl;
*/
public interface ProtonHandler {
long tick(boolean firstTick);
public static final class Factory {
public static ProtonHandler create() {

View File

@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@ -27,6 +28,7 @@ import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
@ -45,6 +47,7 @@ import org.proton.plug.util.DebugInfo;
*/
public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler {
private final Transport transport = Proton.transport();
private final Connection connection = Proton.connection();
@ -82,6 +85,27 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
connection.collect(collector);
}
@Override
public long tick(boolean firstTick) {
if (!firstTick) {
try {
if (connection.getLocalState() != EndpointState.CLOSED) {
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
if (transport.isClosed()) {
throw new IllegalStateException("Channel was inactive for to long");
}
return rescheduleAt;
}
}
catch (Exception e) {
transport.close();
connection.setCondition(new ErrorCondition());
}
return 0;
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
}
@Override
public int capacity() {
synchronized (lock) {

View File

@ -48,7 +48,7 @@ public class AbstractConnectionContextTest {
private class TestConnectionContext extends AbstractConnectionContext {
public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
super(connectionCallback);
super(connectionCallback, null);
}
@Override

View File

@ -32,6 +32,6 @@ public class InVMTestConnector implements Connector {
@Override
public AMQPClientConnectionContext connect(String host, int port) throws Exception {
return new ProtonClientConnectionContext(new ProtonINVMSPI());
return new ProtonClientConnectionContext(new ProtonINVMSPI(), null);
}
}

View File

@ -35,7 +35,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
AMQPConnectionContext returningConnection;
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI());
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), null);
final ExecutorService mainExecutor = Executors.newSingleThreadExecutor();

View File

@ -59,7 +59,7 @@ public class SimpleAMQPConnector implements Connector {
AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI);
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, null);
future.channel().pipeline().addLast(new ChannelDuplexHandler() {
@Override

View File

@ -124,7 +124,7 @@ public class MinimalServer {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()));
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), null);
//ctx.read();
}