ARTEMIS-1056 Improving Proton usage

This commit is contained in:
Clebert Suconic 2017-03-27 22:54:06 -04:00
parent 177480d868
commit 7f91d29564
7 changed files with 281 additions and 351 deletions

View File

@ -22,12 +22,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@ -35,7 +32,6 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -51,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -72,8 +67,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
protected AMQPConnectionContext amqpConnection; protected AMQPConnectionContext amqpConnection;
private final ReusableLatch latch = new ReusableLatch(0);
private final Executor closeExecutor; private final Executor closeExecutor;
private String remoteContainerId; private String remoteContainerId;
@ -160,25 +153,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
} }
public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) { public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
final int size = byteBuf.writerIndex(); connection.write(new ChannelBufferWrapper(byteBuf, true));
latch.countUp();
connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
latch.countDown();
}
});
if (amqpConnection.isSyncOnFlush()) {
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.warn("Error during await invocation", e);
}
}
amqpConnection.outputDone(size);
} }
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {

View File

@ -109,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
} }
String id = server.getConfiguration().getName(); String id = server.getConfiguration().getName();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler); eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

View File

@ -16,22 +16,16 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -55,9 +49,13 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
public class AMQPConnectionContext extends ProtonInitializable { public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@ -73,8 +71,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>(); private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>();
protected LocalListener listener = new LocalListener();
private final ProtonProtocolManager protocolManager; private final ProtonProtocolManager protocolManager;
public AMQPConnectionContext(ProtonProtocolManager protocolManager, public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@ -83,7 +79,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
this.protocolManager = protocolManager; this.protocolManager = protocolManager;
@ -95,7 +90,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
this.handler = new ProtonHandler(dispatchExecutor); this.handler = new ProtonHandler();
handler.addEventHandler(this);
Transport transport = handler.getTransport(); Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false); transport.setEmitFlowEventOnSend(false);
if (idleTimeout > 0) { if (idleTimeout > 0) {
@ -103,7 +99,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
} }
transport.setChannelMax(channelMax); transport.setChannelMax(channelMax);
transport.setMaxFrameSize(maxFrameSize); transport.setMaxFrameSize(maxFrameSize);
handler.addEventHandler(listener);
} }
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
@ -141,10 +136,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
return handler.capacity(); return handler.capacity();
} }
public void outputDone(int bytes) {
handler.outputDone(bytes);
}
public void flush() { public void flush() {
handler.flush(); handler.flush();
} }
@ -176,14 +167,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
return handler.getCreationTime(); return handler.getCreationTime();
} }
protected void flushBytes() {
ByteBuf bytes;
// handler.outputBuffer has the lock
while ((bytes = handler.outputBuffer()) != null) {
connectionCallback.onTransport(bytes, this);
}
}
public String getRemoteContainer() { public String getRemoteContainer() {
return handler.getConnection().getRemoteContainer(); return handler.getConnection().getRemoteContainer();
} }
@ -218,7 +201,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
public Symbol[] getConnectionCapabilitiesOffered() { public Symbol[] getConnectionCapabilitiesOffered() {
URI tc = connectionCallback.getFailoverList(); URI tc = connectionCallback.getFailoverList();
if (tc != null) { if (tc != null) {
Map<Symbol,Object> hostDetails = new HashMap<>(); Map<Symbol, Object> hostDetails = new HashMap<>();
hostDetails.put(NETWORK_HOST, tc.getHost()); hostDetails.put(NETWORK_HOST, tc.getHost());
boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true"); boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
if (isSSL) { if (isSSL) {
@ -268,9 +251,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
} }
} }
// This listener will perform a bunch of things here
class LocalListener implements EventHandler {
@Override @Override
public void onInit(Connection connection) throws Exception { public void onInit(Connection connection) throws Exception {
@ -336,7 +316,13 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override @Override
public void onTransport(Transport transport) { public void onTransport(Transport transport) {
flushBytes(); handler.flushBytes();
}
@Override
public void pushBytes(ByteBuf bytes) {
connectionCallback.onTransport(bytes, this);
} }
@Override @Override
@ -365,13 +351,11 @@ public class AMQPConnectionContext extends ProtonInitializable {
* */ * */
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true); long nextKeepAliveTime = handler.tick(true);
flushBytes();
if (nextKeepAliveTime > 0 && scheduledPool != null) { if (nextKeepAliveTime > 0 && scheduledPool != null) {
scheduledPool.schedule(new Runnable() { scheduledPool.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
flushBytes();
if (rescheduleAt > 0) { if (rescheduleAt > 0) {
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
} }
@ -394,7 +378,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
sessions.clear(); sessions.clear();
// We must force write the channel before we actually destroy the connection // We must force write the channel before we actually destroy the connection
onTransport(handler.getTransport()); handler.flushBytes();
destroy(); destroy();
} }
@ -461,7 +445,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
link.free(); link.free();
} }
flush();
} }
@Override @Override
@ -479,9 +462,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
if (handler != null) { if (handler != null) {
handler.onMessage(delivery); handler.onMessage(delivery);
} else { } else {
// TODO: logs log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location"));
System.err.println("Handler is null, can't delivery " + delivery);
}
} }
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton.handler; package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Link;
@ -75,4 +76,6 @@ public interface EventHandler {
void onTransport(Transport transport) throws Exception; void onTransport(Transport transport) throws Exception;
void pushBytes(ByteBuf bytes);
} }

View File

@ -17,14 +17,9 @@
package org.apache.activemq.artemis.protocol.amqp.proton.handler; package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
public final class Events { public final class Events {
public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception {
handler.onTransport(transport);
}
public static void dispatch(Event event, EventHandler handler) throws Exception { public static void dispatch(Event event, EventHandler handler) throws Exception {
switch (event.getType()) { switch (event.getType()) {
case CONNECTION_INIT: case CONNECTION_INIT:

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -54,16 +54,10 @@ public class ProtonHandler extends ProtonInitializable {
private final Collector collector = Proton.collector(); private final Collector collector = Proton.collector();
private final Executor dispatchExecutor; private List<EventHandler> handlers = new ArrayList<>();
private final Runnable dispatchRunnable = () -> dispatch();
private ArrayList<EventHandler> handlers = new ArrayList<>();
private Sasl serverSasl; private Sasl serverSasl;
private Sasl clientSasl;
private final Object lock = new Object(); private final Object lock = new Object();
private final long creationTime; private final long creationTime;
@ -76,16 +70,16 @@ public class ProtonHandler extends ProtonInitializable {
protected boolean receivedFirstPacket = false; protected boolean receivedFirstPacket = false;
private int offset = 0; boolean inDispatch = false;
public ProtonHandler(Executor dispatchExecutor) { public ProtonHandler() {
this.dispatchExecutor = dispatchExecutor;
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
transport.bind(connection); transport.bind(connection);
connection.collect(collector); connection.collect(collector);
} }
public long tick(boolean firstTick) { public long tick(boolean firstTick) {
try {
synchronized (lock) { synchronized (lock) {
if (!firstTick) { if (!firstTick) {
try { try {
@ -97,6 +91,7 @@ public class ProtonHandler extends ProtonInitializable {
return rescheduleAt; return rescheduleAt;
} }
} catch (Exception e) { } catch (Exception e) {
log.warn(e.getMessage(), e);
transport.close(); transport.close();
connection.setCondition(new ErrorCondition()); connection.setCondition(new ErrorCondition());
} }
@ -104,6 +99,9 @@ public class ProtonHandler extends ProtonInitializable {
} }
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
} }
} finally {
flushBytes();
}
} }
public int capacity() { public int capacity() {
@ -143,6 +141,30 @@ public class ProtonHandler extends ProtonInitializable {
} }
public void flushBytes() {
synchronized (lock) {
while (true) {
int pending = transport.pending();
if (pending <= 0) {
break;
}
// We allocated a Pooled Direct Buffer, that will be sent down the stream
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending);
ByteBuffer head = transport.head();
buffer.writeBytes(head);
for (EventHandler handler : handlers) {
handler.pushBytes(buffer);
}
transport.pop(pending);
}
}
}
public SASLResult getSASLResult() { public SASLResult getSASLResult() {
return saslResult; return saslResult;
} }
@ -201,57 +223,13 @@ public class ProtonHandler extends ProtonInitializable {
return creationTime; return creationTime;
} }
public void outputDone(int bytes) {
synchronized (lock) {
transport.pop(bytes);
offset -= bytes;
if (offset < 0) {
throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
", outcome result=" + offset);
}
}
flush();
}
public ByteBuf outputBuffer() {
synchronized (lock) {
int pending = transport.pending();
if (pending < 0) {
return null;//throw new IllegalStateException("xxx need to close the connection");
}
int size = pending - offset;
if (size < 0) {
throw new IllegalStateException("negative size: " + pending);
}
if (size == 0) {
return null;
}
// For returning PooledBytes
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
ByteBuffer head = transport.head();
head.position(offset);
head.limit(offset + size);
buffer.writeBytes(head);
offset += size; // incrementing offset for future calls
return buffer;
}
}
public void flush() { public void flush() {
synchronized (lock) { synchronized (lock) {
transport.process(); transport.process();
checkServerSASL(); checkServerSASL();
} }
dispatchExecutor.execute(dispatchRunnable); dispatch();
} }
public void close(ErrorCondition errorCondition) { public void close(ErrorCondition errorCondition) {
@ -304,12 +282,14 @@ public class ProtonHandler extends ProtonInitializable {
private void dispatch() { private void dispatch() {
Event ev; Event ev;
// We don't hold a lock on the entire event processing
// because we could have a distributed deadlock
// while processing events (for instance onTransport)
// while a client is also trying to write here
synchronized (lock) { synchronized (lock) {
if (inDispatch) {
// Avoid recursion from events
return;
}
try {
inDispatch = true;
while ((ev = collector.peek()) != null) { while ((ev = collector.peek()) != null) {
for (EventHandler h : handlers) { for (EventHandler h : handlers) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
@ -325,17 +305,13 @@ public class ProtonHandler extends ProtonInitializable {
collector.pop(); collector.pop();
} }
}
for (EventHandler h : handlers) { } finally {
try { inDispatch = false;
h.onTransport(transport);
} catch (Exception e) {
log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition());
} }
} }
flushBytes();
} }
public void open(String containerId, Map<Symbol, Object> connectionProperties) { public void open(String containerId, Map<Symbol, Object> connectionProperties) {