This closes #712

This commit is contained in:
Clebert Suconic 2016-08-10 08:56:44 -04:00
commit 155a345d3c
2 changed files with 27 additions and 0 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.proton.plug.context; package org.proton.plug.context;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -24,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
@ -54,6 +56,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
protected AMQPConnectionCallback connectionCallback; protected AMQPConnectionCallback connectionCallback;
private final String containerId; private final String containerId;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>(); private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
@ -73,6 +76,8 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
this.connectionCallback = connectionCallback; this.connectionCallback = connectionCallback;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
connectionProperties.put(Symbol.valueOf("product"), "apache-activemq-artemis");
connectionProperties.put(Symbol.valueOf("version"), VersionLoader.getVersion().getFullVersion());
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
this.handler = ProtonHandler.Factory.create(dispatchExecutor); this.handler = ProtonHandler.Factory.create(dispatchExecutor);
@ -196,6 +201,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
synchronized (getLock()) { synchronized (getLock()) {
connection.setContext(AbstractConnectionContext.this); connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId); connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.open(); connection.open();
} }
initialise(); initialise();

View File

@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -66,6 +67,7 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.ProtonJMessage;
@ -210,6 +212,25 @@ public class ProtonTest extends ActiveMQTestBase {
} }
} }
@Test
public void testBrokerConnectionProperties() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties();
assertTrue(properties != null);
if (properties != null) {
assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product"))));
assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version"))));
}
}
finally {
amqpConnection.close();
}
}
@Test @Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol