Clean up SASL authentication code to make it easier to add new
mechanisms.
This commit is contained in:
Timothy Bish 2015-03-30 16:41:34 -04:00
parent 0fd174b928
commit e333fd957b
7 changed files with 418 additions and 150 deletions

View File

@ -26,11 +26,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -57,8 +54,6 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.security.AuthenticationBroker;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
@ -67,6 +62,7 @@ import org.apache.activemq.transport.amqp.AmqpTransport;
import org.apache.activemq.transport.amqp.AmqpTransportFilter;
import org.apache.activemq.transport.amqp.AmqpWireFormat;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.qpid.proton.Proton;
@ -80,7 +76,6 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
@ -108,27 +103,28 @@ public class AmqpConnection implements AmqpProtocolConverter {
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
private final BrokerService brokerService;
private AuthenticationBroker authenticator;
private Sasl sasl;
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private final AtomicInteger lastCommandId = new AtomicInteger();
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
private boolean closing = false;
private boolean closedSocket = false;
private long nextSessionId;
private long nextTempDestinationId;
private boolean closing;
private boolean closedSocket;
private AmqpAuthenticator authenticator;
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
this.amqpTransport = transport;
AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
if (monitor != null) {
monitor.setProtocolConverter(this);
}
this.amqpWireFormat = transport.getWireFormat();
this.brokerService = brokerService;
@ -272,11 +268,10 @@ public class AmqpConnection implements AmqpProtocolConverter {
switch (header.getProtocolId()) {
case 0:
authenticator = null;
break; // nothing to do..
case 3: // Client will be using SASL for auth..
sasl = protonTransport.sasl();
sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
sasl.server();
authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService);
break;
default:
}
@ -285,10 +280,6 @@ public class AmqpConnection implements AmqpProtocolConverter {
frame = (Buffer) command;
}
onFrame(frame);
}
public void onFrame(Buffer frame) throws Exception {
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@ -298,89 +289,69 @@ public class AmqpConnection implements AmqpProtocolConverter {
return;
}
try {
if (sasl != null) {
// Lets try to complete the sasl handshake.
if (sasl.getRemoteMechanisms().length > 0) {
if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
byte[] data = new byte[sasl.pending()];
sasl.recv(data, 0, data.length);
Buffer[] parts = new Buffer(data).split((byte) 0);
if (parts.length > 0) {
connectionInfo.setUserName(parts[0].utf8().toString());
}
if (parts.length > 1) {
connectionInfo.setPassword(parts[1].utf8().toString());
}
if (authenticator != null) {
processSaslExchange();
} else {
processProtonEvents();
}
}
}
if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
} else {
sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
}
private void processSaslExchange() throws Exception {
authenticator.processSaslExchange(connectionInfo);
if (authenticator.isDone()) {
amqpTransport.getWireFormat().resetMagicRead();
}
pumpProtonToSocket();
}
amqpTransport.getWireFormat().resetMagicRead();
sasl = null;
LOG.debug("SASL [PLAIN] Handshake complete.");
} else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
} else {
sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
}
amqpTransport.getWireFormat().resetMagicRead();
sasl = null;
LOG.debug("SASL [ANONYMOUS] Handshake complete.");
}
}
private void processProtonEvents() throws Exception {
try {
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
processConnectionOpen(event.getConnection());
break;
case CONNECTION_REMOTE_CLOSE:
processConnectionClose(event.getConnection());
break;
case SESSION_REMOTE_OPEN:
processSessionOpen(event.getSession());
break;
case SESSION_REMOTE_CLOSE:
processSessionClose(event.getSession());
break;
case LINK_REMOTE_OPEN:
processLinkOpen(event.getLink());
break;
case LINK_REMOTE_DETACH:
processLinkDetach(event.getLink());
break;
case LINK_REMOTE_CLOSE:
processLinkClose(event.getLink());
break;
case LINK_FLOW:
processLinkFlow(event.getLink());
break;
case DELIVERY:
processDelivery(event.getDelivery());
break;
default:
break;
}
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
processConnectionOpen(event.getConnection());
break;
case CONNECTION_REMOTE_CLOSE:
processConnectionClose(event.getConnection());
break;
case SESSION_REMOTE_OPEN:
processSessionOpen(event.getSession());
break;
case SESSION_REMOTE_CLOSE:
processSessionClose(event.getSession());
break;
case LINK_REMOTE_OPEN:
processLinkOpen(event.getLink());
break;
case LINK_REMOTE_DETACH:
processLinkDetach(event.getLink());
break;
case LINK_REMOTE_CLOSE:
processLinkClose(event.getLink());
break;
case LINK_FLOW:
processLinkFlow(event.getLink());
break;
case DELIVERY:
processDelivery(event.getDelivery());
break;
default:
break;
}
eventCollector.pop();
}
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
eventCollector.pop();
}
pumpProtonToSocket();
} catch (Throwable e) {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
pumpProtonToSocket();
}
protected void processConnectionOpen(Connection connection) throws Exception {
@ -697,46 +668,4 @@ public class AmqpConnection implements AmqpProtocolConverter {
monitor.stopConnectChecker();
}
private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
try {
if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null) {
return true;
}
return false;
} catch (Throwable error) {
return false;
}
}
private AuthenticationBroker getAuthenticator() {
if (authenticator == null) {
try {
authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
} catch (Exception e) {
LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
}
if (authenticator == null) {
authenticator = new DefaultAuthenticationBroker();
}
}
return authenticator;
}
private class DefaultAuthenticationBroker implements AuthenticationBroker {
@Override
public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
return new SecurityContext(username) {
@Override
public Set<Principal> getPrincipals() {
return null;
}
};
}
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.sasl;
/**
* Base class for SASL Mechanisms that provides common functionality.
*/
public abstract class AbstractSaslMechanism implements SaslMechanism {
protected String username;
protected String password;
@Override
public String getUsername() {
return username;
}
@Override
public String getPassword() {
return password;
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.sasl;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.security.AuthenticationBroker;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.amqp.AmqpTransport;
import org.apache.qpid.proton.engine.Sasl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SASL Authenitcation engine.
*/
public class AmqpAuthenticator {
private static final Logger LOG = LoggerFactory.getLogger(AmqpAuthenticator.class);
private static final String[] mechanisms = new String[] { "ANONYMOUS", "PLAIN" };
private final BrokerService brokerService;
private final AmqpTransport transport;
private final Sasl sasl;
private AuthenticationBroker authenticator;
public AmqpAuthenticator(AmqpTransport transport, Sasl sasl, BrokerService brokerService) {
this.brokerService = brokerService;
this.transport = transport;
this.sasl = sasl;
sasl.setMechanisms(mechanisms);
sasl.server();
}
/**
* @return true if the SASL exchange has conpleted, regardless of success.
*/
public boolean isDone() {
return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
}
/**
* @return the list of all SASL mechanisms that are supported curretnly.
*/
public String[] getSupportedMechanisms() {
return mechanisms;
}
public void processSaslExchange(ConnectionInfo connectionInfo) {
if (sasl.getRemoteMechanisms().length > 0) {
SaslMechanism mechanism = getSaslMechanism(sasl.getRemoteMechanisms());
if (mechanism != null) {
LOG.debug("SASL [{}} Handshake started.", mechanism.getMechanismName());
mechanism.processSaslStep(sasl);
connectionInfo.setUserName(mechanism.getUsername());
connectionInfo.setPassword(mechanism.getPassword());
if (tryAuthenticate(connectionInfo, transport.getPeerCertificates())) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
} else {
sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
}
LOG.debug("SASL [{}} Handshake complete.", mechanism.getMechanismName());
} else {
LOG.info("SASL: could not find supported mechanism");
sasl.done(Sasl.SaslOutcome.PN_SASL_PERM);
}
}
}
//----- Internal implementation ------------------------------------------//
private SaslMechanism getSaslMechanism(String[] remoteMechanisms) {
String primary = remoteMechanisms[0];
if (primary.equalsIgnoreCase("PLAIN")) {
return new PlainMechanism();
} else if (primary.equalsIgnoreCase("ANONYMOUS")) {
return new AnonymousMechanism();
}
return null;
}
private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
try {
return getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null;
} catch (Throwable error) {
return false;
}
}
private AuthenticationBroker getAuthenticator() {
if (authenticator == null) {
try {
authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
} catch (Exception e) {
LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
}
if (authenticator == null) {
authenticator = new DefaultAuthenticationBroker();
}
}
return authenticator;
}
private class DefaultAuthenticationBroker implements AuthenticationBroker {
@Override
public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
return new SecurityContext(username) {
@Override
public Set<Principal> getPrincipals() {
return null;
}
};
}
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.sasl;
import org.apache.qpid.proton.engine.Sasl;
/**
* SASL Anonymous mechanism implementation.
*/
public class AnonymousMechanism implements SaslMechanism {
@Override
public void processSaslStep(Sasl sasl) {
}
@Override
public String getMechanismName() {
return "ANONYMOUS";
}
@Override
public String getUsername() {
return null;
}
@Override
public String getPassword() {
return null;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.sasl;
import org.apache.qpid.proton.engine.Sasl;
import org.fusesource.hawtbuf.Buffer;
/**
* Implements the SASL Plain mechanism.
*/
public class PlainMechanism extends AbstractSaslMechanism {
@Override
public void processSaslStep(Sasl sasl) {
byte[] data = new byte[sasl.pending()];
sasl.recv(data, 0, data.length);
Buffer[] parts = new Buffer(data).split((byte) 0);
if (parts.length > 0) {
username = parts[0].utf8().toString();
}
if (parts.length > 1) {
password = parts[1].utf8().toString();
}
}
@Override
public String getMechanismName() {
return "PLAIN";
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.sasl;
import org.apache.qpid.proton.engine.Sasl;
/**
* A SASL Mechanism implements this interface in order to provide the
* AmqpAuthenticator with the means of providing authentication services
* in the SASL handshake step.
*/
public interface SaslMechanism {
/**
* Perform the SASL processing for this mechanism type.
*
* @param sasl
* the SASL server that has read the incoming SASL exchange.
*/
void processSaslStep(Sasl sasl);
/**
* @return the User Name extracted from the SASL echange or null if none.
*/
String getUsername();
/**
* @return the Password extracted from the SASL echange or null if none.
*/
String getPassword();
/**
* @return the name of the implemented SASL mechanism.
*/
String getMechanismName();
}

View File

@ -37,36 +37,55 @@ import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSClientSimpleAuthTest {
@Rule public TestName name = new TestName();
private static final Logger LOG = LoggerFactory.getLogger(JMSClientSimpleAuthTest.class);
private final String SIMPLE_AUTH_AMQP_BROKER_XML =
"org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
private BrokerService brokerService;
private Connection connection;
private URI amqpURI;
@Before
public void setUp() throws Exception {
LOG.info("========== starting: " + getTestName() + " ==========");
startBroker();
}
@After
public void stopBroker() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {}
connection = null;
}
if (brokerService != null) {
brokerService.stop();
brokerService = null;
}
LOG.info("========== finished: " + getTestName() + " ==========");
}
public String getTestName() {
return name.getMethodName();
}
@Test(timeout = 10000)
public void testNoUserOrPassword() throws Exception {
try {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
@ -77,22 +96,22 @@ public class JMSClientSimpleAuthTest {
@Test(timeout = 10000)
public void testUnknownUser() throws Exception {
try {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser", "blah");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser", "blah");
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
LOG.debug("Failed to authenticate connection with unknown user ID");
}
}
@Test(timeout = 10000)
public void testKnownUserWrongPassword() throws Exception {
try {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
LOG.debug("Failed to authenticate connection with incorrect password.");
}
}
@ -105,7 +124,7 @@ public class JMSClientSimpleAuthTest {
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
LOG.debug("Failed to authenticate connection with incorrect password.");
} finally {
if (connection != null) {
connection.close();
@ -116,7 +135,7 @@ public class JMSClientSimpleAuthTest {
@Test(timeout = 30000)
public void testSendReceive() throws Exception {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("USERS.txQueue");
MessageProducer p = session.createProducer(queue);
@ -139,7 +158,7 @@ public class JMSClientSimpleAuthTest {
@Test(timeout = 30000)
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
@ -151,13 +170,11 @@ public class JMSClientSimpleAuthTest {
// Should not be fatal
assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
session.close();
}
@Test(timeout = 30000)
public void testCreateTemporaryTopicNotAuthorized() throws JMSException {
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
@ -169,8 +186,6 @@ public class JMSClientSimpleAuthTest {
// Should not be fatal
assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
session.close();
}
protected BrokerService createBroker() throws Exception {