commit
30cac20902
|
@ -23,7 +23,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -36,6 +35,7 @@ import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.sasl.MechanismFinder;
|
import org.apache.activemq.artemis.protocol.amqp.sasl.MechanismFinder;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
|
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
|
@ -44,6 +44,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
|
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
|
||||||
*/
|
*/
|
||||||
|
@ -77,7 +79,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
// TODO fix this
|
// TODO fix this
|
||||||
private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
|
private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
|
||||||
|
|
||||||
private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
|
private int maxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
|
||||||
|
|
||||||
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
|
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
|
@ -220,7 +222,6 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
this.saslLoginConfigScope = saslLoginConfigScope;
|
this.saslLoginConfigScope = saslLoginConfigScope;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setAnycastPrefix(String anycastPrefix) {
|
public void setAnycastPrefix(String anycastPrefix) {
|
||||||
for (String prefix : anycastPrefix.split(",")) {
|
for (String prefix : anycastPrefix.split(",")) {
|
||||||
|
|
|
@ -29,8 +29,6 @@ public class AMQPConstants {
|
||||||
|
|
||||||
public static final int DEFAULT_IDLE_TIMEOUT = -1;
|
public static final int DEFAULT_IDLE_TIMEOUT = -1;
|
||||||
|
|
||||||
public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l;
|
|
||||||
|
|
||||||
public static final int DEFAULT_CHANNEL_MAX = 65535;
|
public static final int DEFAULT_CHANNEL_MAX = 65535;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,6 +65,7 @@ public class AmqpSupport {
|
||||||
|
|
||||||
static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
|
static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
|
||||||
|
|
||||||
|
public static final int MAX_FRAME_SIZE_DEFAULT = 128 * 1024;
|
||||||
|
|
||||||
// Symbols used in configuration of newly opened links.
|
// Symbols used in configuration of newly opened links.
|
||||||
public static final Symbol COPY = Symbol.getSymbol("copy");
|
public static final Symbol COPY = Symbol.getSymbol("copy");
|
||||||
|
|
|
@ -96,6 +96,31 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDefaultMaxFrameSize() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
assertNotNull(client);
|
||||||
|
|
||||||
|
client.setValidator(new AmqpValidator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void inspectOpenedResource(Connection connection) {
|
||||||
|
int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
|
||||||
|
if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) {
|
||||||
|
markAsInvalid("Broker did not send the expected max Frame Size");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
assertNotNull(connection);
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testBrokerConnectionProperties() throws Exception {
|
public void testBrokerConnectionProperties() throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -40,6 +42,31 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
||||||
params.put("maxFrameSize", FRAME_SIZE);
|
params.put("maxFrameSize", FRAME_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBrokerHonorsSetMaxFrameSize() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
assertNotNull(client);
|
||||||
|
|
||||||
|
client.setValidator(new AmqpValidator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void inspectOpenedResource(Connection connection) {
|
||||||
|
int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
|
||||||
|
if (brokerMaxFrameSize != FRAME_SIZE) {
|
||||||
|
markAsInvalid("Broker did not send the expected max Frame Size");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
assertNotNull(connection);
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testMultipleTransfers() throws Exception {
|
public void testMultipleTransfers() throws Exception {
|
||||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
Loading…
Reference in New Issue