diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index 09478d7f6e..a7607af71c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.util.ArrayList; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.Command; /** @@ -29,14 +30,16 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { private static final int DEFAULT_PREFETCH = 100; - final private AmqpTransport transport; + private final AmqpTransport transport; + private final BrokerService brokerService; + private int prefetch = DEFAULT_PREFETCH; private int producerCredit = DEFAULT_PREFETCH; interface Discriminator { boolean matches(AmqpHeader header); - IAmqpProtocolConverter create(AmqpTransport transport); + IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService); } static final private ArrayList DISCRIMINATORS = new ArrayList(); @@ -44,8 +47,8 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { DISCRIMINATORS.add(new Discriminator() { @Override - public IAmqpProtocolConverter create(AmqpTransport transport) { - return new AmqpProtocolConverter(transport); + public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) { + return new AmqpProtocolConverter(transport, brokerService); } @Override @@ -60,13 +63,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { return false; } }); - } final private ArrayList pendingCommands = new ArrayList(); - public AMQPProtocolDiscriminator(AmqpTransport transport) { + public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) { this.transport = transport; + this.brokerService = brokerService; } @Override @@ -80,11 +83,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { match = discriminator; } } + // Lets use first in the list if none are a good match. if (match == null) { match = DISCRIMINATORS.get(0); } - IAmqpProtocolConverter next = match.create(transport); + + IAmqpProtocolConverter next = match.create(transport, brokerService); next.setPrefetch(prefetch); next.setProducerCredit(producerCredit); transport.setProtocolConverter(next); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java index 4c036a99b9..4d7af7e419 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.MutexTransport; @@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat; */ public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; @Override protected String getDefaultWireFormatType() { @@ -43,7 +42,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerContext); + transport = new AmqpTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } @@ -63,7 +62,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok @Override public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } @Override diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java index c67d3b6c42..b017937f1f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java @@ -27,7 +27,6 @@ import java.util.Map; import javax.net.ServerSocketFactory; import javax.net.SocketFactory; -import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.MutexTransport; @@ -43,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat; */ public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; @Override protected String getDefaultWireFormatType() { @@ -81,14 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerContext); + transport = new AmqpTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } @Override public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } @Override diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 62966bcd2f..7b3b825f60 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -19,10 +19,12 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,6 +32,7 @@ import javax.jms.Destination; import javax.jms.InvalidClientIDException; import javax.jms.InvalidSelectorException; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempQueue; @@ -55,8 +58,10 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.store.PersistenceAdapterSupport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; @@ -114,7 +119,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private static final int CHANNEL_MAX = 32767; - private final AmqpTransport amqpTransport; private static final Symbol COPY = Symbol.getSymbol("copy"); private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); @@ -122,6 +126,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); + private final AmqpTransport amqpTransport; + private final BrokerService brokerService; + protected int prefetch; protected int producerCredit; protected Transport protonTransport = Proton.transport(); @@ -129,8 +136,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { protected Collector eventCollector = new CollectorImpl(); protected boolean useByteDestinationTypeAnnotation; - public AmqpProtocolConverter(AmqpTransport transport) { + public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) { this.amqpTransport = transport; + this.brokerService = brokerService; // the configured maxFrameSize on the URI. int maxFrameSize = transport.getWireFormat().getMaxAmqpFrameSize(); @@ -1468,4 +1476,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public void setProducerCredit(int producerCredit) { this.producerCredit = producerCredit; } + + @SuppressWarnings("unused") + private List lookupSubscriptions() throws AmqpProtocolException { + List subscriptions = Collections.emptyList(); + try { + subscriptions = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId()); + } catch (IOException e) { + throw new AmqpProtocolException("Error loading store subscriptions", true, e); + } + + return subscriptions; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index e3680c59c9..9a01f7bdc7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.fusesource.hawtbuf.Buffer; /** + * */ public class AmqpSupport { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java index e394c85a03..3ca8ea1424 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.MutexTransport; @@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat; */ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { - private BrokerContext brokerContext = null; + private BrokerService brokerService = null; @Override protected String getDefaultWireFormatType() { @@ -43,14 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerContext); + transport = new AmqpTransportFilter(transport, format, brokerService); IntrospectionSupport.setProperties(transport, options); return super.compositeConfigure(transport, format, options); } @Override public void setBrokerService(BrokerService brokerService) { - this.brokerContext = brokerService.getBrokerContext(); + this.brokerService = brokerService; } @SuppressWarnings("rawtypes") diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index ec63ae7e5b..5fb7a04348 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.security.cert.X509Certificate; import java.util.concurrent.locks.ReentrantLock; -import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; @@ -49,9 +49,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor private String transformer = InboundTransformer.TRANSFORMER_NATIVE; private final ReentrantLock lock = new ReentrantLock(); - public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { + public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { super(next); - this.protocolConverter = new AMQPProtocolDiscriminator(this); + this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService); if (wireFormat instanceof AmqpWireFormat) { this.wireFormat = (AmqpWireFormat) wireFormat; }