From f471b51c2a3554b84f3d2d22f3730e243ca71ee5 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 9 Jun 2016 17:32:41 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6319 Improve configuration for disabling non-SASL connections. (cherry picked from commit c49db029ab280dcc1755d855f3641526ad2a5e90) --- .../transport/amqp/AmqpWireFormat.java | 8 ++- .../transport/amqp/AmqpWireFormatFactory.java | 10 ++++ .../amqp/protocol/UnsupportedClientTest.java | 55 +++++++++++++++++++ 3 files changed, 71 insertions(+), 2 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index b5c8f593ae..149eb7092d 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat { public static final int DEFAULT_CONNECTION_TIMEOUT = 30000; public static final int DEFAULT_IDLE_TIMEOUT = 30000; public static final int DEFAULT_PRODUCER_CREDIT = 1000; + public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = true; private static final int SASL_PROTOCOL = 3; @@ -50,6 +51,7 @@ public class AmqpWireFormat implements WireFormat { private int idelTimeout = DEFAULT_IDLE_TIMEOUT; private int producerCredit = DEFAULT_PRODUCER_CREDIT; private String transformer = InboundTransformer.TRANSFORMER_JMS; + private boolean allowNonSaslConnections = DEFAULT_ALLOW_NON_SASL_CONNECTIONS; private boolean magicRead = false; private ResetListener resetListener; @@ -58,8 +60,6 @@ public class AmqpWireFormat implements WireFormat { void onProtocolReset(); } - private boolean allowNonSaslConnections = true; - @Override public ByteSequence marshal(Object command) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -131,6 +131,10 @@ public class AmqpWireFormat implements WireFormat { return false; } + if (!(header.getProtocolId() == 0 || header.getProtocolId() == 3)) { + return false; + } + if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) { return false; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index fb7aea4255..bb428b4d3a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -30,6 +30,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory { private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT; private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT; private String transformer = InboundTransformer.TRANSFORMER_NATIVE; + private boolean allowNonSaslConnections = AmqpWireFormat.DEFAULT_ALLOW_NON_SASL_CONNECTIONS; @Override public WireFormat createWireFormat() { @@ -40,6 +41,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory { wireFormat.setIdleTimeout(getIdelTimeout()); wireFormat.setProducerCredit(getProducerCredit()); wireFormat.setTransformer(getTransformer()); + wireFormat.setAllowNonSaslConnections(isAllowNonSaslConnections()); return wireFormat; } @@ -83,4 +85,12 @@ public class AmqpWireFormatFactory implements WireFormatFactory { public void setTransformer(String transformer) { this.transformer = transformer; } + + public boolean isAllowNonSaslConnections() { + return allowNonSaslConnections; + } + + public void setAllowNonSaslConnections(boolean allowNonSaslConnections) { + this.allowNonSaslConnections = allowNonSaslConnections; + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java index d71aee28cc..ae8f1a3877 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java @@ -60,11 +60,17 @@ public class UnsupportedClientTest extends AmqpTestSupport { super.setUp(); } + @Override + public String getAdditionalConfig() { + return "&wireFormat.allowNonSaslConnections=false"; + } + @Test(timeout = 60000) public void testOlderProtocolIsRejected() throws Exception { AmqpHeader header = new AmqpHeader(); + header.setProtocolId(3); header.setMajor(0); header.setMinor(9); header.setRevision(1); @@ -87,6 +93,7 @@ public class UnsupportedClientTest extends AmqpTestSupport { AmqpHeader header = new AmqpHeader(); + header.setProtocolId(3); header.setMajor(2); header.setMinor(0); header.setRevision(0); @@ -109,6 +116,7 @@ public class UnsupportedClientTest extends AmqpTestSupport { AmqpHeader header = new AmqpHeader(); + header.setProtocolId(3); header.setMajor(1); header.setMinor(1); header.setRevision(0); @@ -131,6 +139,7 @@ public class UnsupportedClientTest extends AmqpTestSupport { AmqpHeader header = new AmqpHeader(); + header.setProtocolId(3); header.setMajor(1); header.setMinor(0); header.setRevision(1); @@ -148,6 +157,52 @@ public class UnsupportedClientTest extends AmqpTestSupport { doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } + @Test(timeout = 60000) + public void testNonSaslClientIsRejected() throws Exception { + + AmqpHeader header = new AmqpHeader(); + + header.setProtocolId(0); + header.setMajor(1); + header.setMinor(0); + header.setRevision(0); + + // Test TCP + doTestInvalidHeaderProcessing(amqpPort, header, false); + + // Test SSL + doTestInvalidHeaderProcessing(amqpSslPort, header, true); + + // Test NIO + doTestInvalidHeaderProcessing(amqpNioPort, header, false); + + // Test NIO+SSL + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); + } + + @Test(timeout = 60000) + public void testUnkownProtocolIdIsRejected() throws Exception { + + AmqpHeader header = new AmqpHeader(); + + header.setProtocolId(5); + header.setMajor(1); + header.setMinor(0); + header.setRevision(0); + + // Test TCP + doTestInvalidHeaderProcessing(amqpPort, header, false); + + // Test SSL + doTestInvalidHeaderProcessing(amqpSslPort, header, true); + + // Test NIO + doTestInvalidHeaderProcessing(amqpNioPort, header, false); + + // Test NIO+SSL + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); + } + @Test(timeout = 60000) public void testInvalidProtocolHeader() throws Exception {