This closes #698

This commit is contained in:
Martyn Taylor 2016-08-03 12:03:25 +01:00
commit 8a3155c0b4
8 changed files with 36 additions and 7 deletions

View File

@ -106,8 +106,9 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
ttl = server.getConfiguration().getConnectionTTLOverride(); ttl = server.getConfiguration().getConnectionTTLOverride();
} }
String id = server.getConfiguration().getName();
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory(). AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); createConnection(connectionCallback, id, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -25,6 +25,7 @@ public abstract class AMQPConnectionContextFactory {
* @return * @return
*/ */
public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,

View File

@ -17,6 +17,7 @@
package org.proton.plug.context; package org.proton.plug.context;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -47,10 +48,12 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
private static final Logger log = Logger.getLogger(AbstractConnectionContext.class); private static final Logger log = Logger.getLogger(AbstractConnectionContext.class);
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
protected final ProtonHandler handler; protected final ProtonHandler handler;
protected AMQPConnectionCallback connectionCallback; protected AMQPConnectionCallback connectionCallback;
private final String containerId;
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>(); private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
@ -58,16 +61,18 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
protected LocalListener listener = new LocalListener(); protected LocalListener listener = new LocalListener();
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool); this(connectionCallback, null, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
} }
public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor, Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
this.connectionCallback = connectionCallback; this.connectionCallback = connectionCallback;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
this.handler = ProtonHandler.Factory.create(dispatchExecutor); this.handler = ProtonHandler.Factory.create(dispatchExecutor);
@ -190,6 +195,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
public void onRemoteOpen(Connection connection) throws Exception { public void onRemoteOpen(Connection connection) throws Exception {
synchronized (getLock()) { synchronized (getLock()) {
connection.setContext(AbstractConnectionContext.this); connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId);
connection.open(); connection.open();
} }
initialise(); initialise();

View File

@ -39,12 +39,13 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
} }
public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor, Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
super(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); super(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
} }
// Maybe a client interface? // Maybe a client interface?

View File

@ -39,11 +39,12 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
@Override @Override
public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor, Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); return new ProtonClientConnectionContext(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
} }
} }

View File

@ -38,12 +38,13 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
} }
public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor, Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
super(connectionSP, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); super(connectionSP, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
} }
@Override @Override

View File

@ -37,16 +37,17 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
@Override @Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool); return createConnection(connectionCallback, null, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
} }
@Override @Override
public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
Executor dispatchExecutor, Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); return new ProtonServerConnectionContext(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
} }
} }

View File

@ -95,6 +95,8 @@ public class ProtonTest extends ActiveMQTestBase {
private static final String password = "guest"; private static final String password = "guest";
private static final String brokerName = "my-broker";
// this will ensure that all tests in this class are run twice, // this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false" // once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
@ -137,6 +139,7 @@ public class ProtonTest extends ActiveMQTestBase {
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
server.getConfiguration().setName(brokerName);
// Default Page // Default Page
AddressSettings addressSettings = new AddressSettings(); AddressSettings addressSettings = new AddressSettings();
@ -186,6 +189,20 @@ public class ProtonTest extends ActiveMQTestBase {
} }
} }
@Test
public void testBrokerContainerId() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer()));
}
finally {
amqpConnection.close();
}
}
@Test @Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol