Make BrokerService visible to the protocol converter so that in the

future we can use it to better manage durable subscriptions and link
reattach behavior.
This commit is contained in:
Timothy Bish 2014-10-28 14:58:17 -04:00
parent 135226533f
commit adafdfe97d
7 changed files with 47 additions and 24 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
/**
@ -29,14 +30,16 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
private static final int DEFAULT_PREFETCH = 100;
final private AmqpTransport transport;
private final AmqpTransport transport;
private final BrokerService brokerService;
private int prefetch = DEFAULT_PREFETCH;
private int producerCredit = DEFAULT_PREFETCH;
interface Discriminator {
boolean matches(AmqpHeader header);
IAmqpProtocolConverter create(AmqpTransport transport);
IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
}
static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
@ -44,8 +47,8 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
DISCRIMINATORS.add(new Discriminator() {
@Override
public IAmqpProtocolConverter create(AmqpTransport transport) {
return new AmqpProtocolConverter(transport);
public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
return new AmqpProtocolConverter(transport, brokerService);
}
@Override
@ -60,13 +63,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
return false;
}
});
}
final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
public AMQPProtocolDiscriminator(AmqpTransport transport) {
public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
this.transport = transport;
this.brokerService = brokerService;
}
@Override
@ -80,11 +83,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
match = discriminator;
}
}
// Lets use first in the list if none are a good match.
if (match == null) {
match = DISCRIMINATORS.get(0);
}
IAmqpProtocolConverter next = match.create(transport);
IAmqpProtocolConverter next = match.create(transport, brokerService);
next.setPrefetch(prefetch);
next.setProducerCredit(producerCredit);
transport.setProtocolConverter(next);

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
@ -43,7 +42,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@ -63,7 +62,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
this.brokerService = brokerService;
}
@Override

View File

@ -27,7 +27,6 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@ -43,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
@ -81,14 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
this.brokerService = brokerService;
}
@Override

View File

@ -19,10 +19,12 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -30,6 +32,7 @@ import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
@ -55,8 +58,10 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@ -114,7 +119,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private static final int CHANNEL_MAX = 32767;
private final AmqpTransport amqpTransport;
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
@ -122,6 +126,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
private final AmqpTransport amqpTransport;
private final BrokerService brokerService;
protected int prefetch;
protected int producerCredit;
protected Transport protonTransport = Proton.transport();
@ -129,8 +136,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
protected Collector eventCollector = new CollectorImpl();
protected boolean useByteDestinationTypeAnnotation;
public AmqpProtocolConverter(AmqpTransport transport) {
public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
this.amqpTransport = transport;
this.brokerService = brokerService;
// the configured maxFrameSize on the URI.
int maxFrameSize = transport.getWireFormat().getMaxAmqpFrameSize();
@ -1468,4 +1476,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
@SuppressWarnings("unused")
private List<SubscriptionInfo> lookupSubscriptions() throws AmqpProtocolException {
List<SubscriptionInfo> subscriptions = Collections.emptyList();
try {
subscriptions = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
} catch (IOException e) {
throw new AmqpProtocolException("Error loading store subscriptions", true, e);
}
return subscriptions;
}
}

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import org.fusesource.hawtbuf.Buffer;
/**
*
*/
public class AmqpSupport {

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
private BrokerContext brokerContext = null;
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
@ -43,14 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerContext);
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
this.brokerService = brokerService;
}
@SuppressWarnings("rawtypes")

View File

@ -20,7 +20,7 @@ import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
@ -49,9 +49,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private final ReentrantLock lock = new ReentrantLock();
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
super(next);
this.protocolConverter = new AMQPProtocolDiscriminator(this);
this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
if (wireFormat instanceof AmqpWireFormat) {
this.wireFormat = (AmqpWireFormat) wireFormat;
}