Ensure that wireFormat transport options get applied
This commit is contained in:
Timothy Bish 2015-03-31 12:35:50 -04:00
parent 8bb58036a0
commit b9ed01fa56
4 changed files with 35 additions and 11 deletions

View File

@ -80,9 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
IntrospectionSupport.setProperties(amqpTransport, options);
IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
return super.compositeConfigure(amqpTransport, format, options);
}
@Override

View File

@ -42,9 +42,14 @@ public class AmqpSslTransportFactory extends SslTransportFactory implements Brok
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
IntrospectionSupport.setProperties(amqpTransport, options);
IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
return super.compositeConfigure(amqpTransport, format, options);
}
@SuppressWarnings("rawtypes")

View File

@ -42,9 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new AmqpTransportFilter(transport, format, brokerService);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
IntrospectionSupport.setProperties(amqpTransport, options);
IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
return super.compositeConfigure(amqpTransport, format, options);
}
@Override

View File

@ -18,7 +18,10 @@ package org.apache.activemq.transport.amqp;
import java.io.File;
import javax.jms.Connection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
public class IDERunner {
@ -29,8 +32,10 @@ public class IDERunner {
public static void main(String[]args) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.addConnector(
"amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE + "&transport.transformer=" + AMQP_TRANSFORMER);
TransportConnector connector = brokerService.addConnector(
"amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE +
"&transport.transformer=" + AMQP_TRANSFORMER +
"&transport.wireFormat.maxAmqpFrameSize=104857600");
KahaDBStore store = new KahaDBStore();
store.setDirectory(new File("target/activemq-data/kahadb"));
@ -41,6 +46,10 @@ public class IDERunner {
brokerService.deleteAllMessages();
brokerService.start();
Connection connection = JMSClientContext.INSTANCE.createConnection(connector.getPublishableConnectURI());
connection.start();
brokerService.waitUntilStopped();
}
}