This closes #2118
This commit is contained in:
commit
655f1dc46a
|
@ -26,7 +26,7 @@ import org.jboss.logging.annotations.MessageLogger;
|
||||||
/**
|
/**
|
||||||
* Logger Code 22
|
* Logger Code 22
|
||||||
*
|
*
|
||||||
* each message id must be 6 digits long starting with 10, the 3rd digit donates the level so
|
* each message id must be 6 digits long starting with 22, the 3rd digit donates the level so
|
||||||
*
|
*
|
||||||
* INF0 1
|
* INF0 1
|
||||||
* WARN 2
|
* WARN 2
|
||||||
|
@ -35,7 +35,7 @@ import org.jboss.logging.annotations.MessageLogger;
|
||||||
* TRACE 5
|
* TRACE 5
|
||||||
* FATAL 6
|
* FATAL 6
|
||||||
*
|
*
|
||||||
* so an INFO message would be 101000 to 101999
|
* so an INFO message would be 241000 to 246999
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@MessageLogger(projectCode = "AMQ")
|
@MessageLogger(projectCode = "AMQ")
|
||||||
|
@ -50,6 +50,10 @@ public interface ActiveMQStompProtocolLogger extends BasicLogger {
|
||||||
@Message(id = 222068, value = "connection closed {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222068, value = "connection closed {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void connectionClosed(StompConnection connection);
|
void connectionClosed(StompConnection connection);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222069, value = "Sent ERROR frame to STOMP client {0}: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void sentErrorToClient(String address, String message);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224023, value = "Unable to send frame {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224023, value = "Unable to send frame {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void errorSendingFrame(@Cause Exception e, StompFrame frame);
|
void errorSendingFrame(@Cause Exception e, StompFrame frame);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.stomp;
|
package org.apache.activemq.artemis.core.protocol.stomp;
|
||||||
|
|
||||||
|
import javax.security.auth.Subject;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -53,8 +55,6 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
|
|
||||||
import javax.security.auth.Subject;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
||||||
|
|
||||||
public final class StompConnection implements RemotingConnection {
|
public final class StompConnection implements RemotingConnection {
|
||||||
|
@ -269,9 +269,8 @@ public final class StompConnection implements RemotingConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
|
public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
|
||||||
ServerSession session = getSession().getCoreSession();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
ServerSession session = getSession().getCoreSession();
|
||||||
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
|
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
|
||||||
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
|
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
|
||||||
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
|
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
|
||||||
|
@ -437,10 +436,18 @@ public final class StompConnection implements RemotingConnection {
|
||||||
return login;
|
return login;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setLogin(String login) {
|
||||||
|
this.login = login;
|
||||||
|
}
|
||||||
|
|
||||||
public String getPasscode() {
|
public String getPasscode() {
|
||||||
return passcode;
|
return passcode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setPasscode(String passcode) {
|
||||||
|
this.passcode = passcode;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClientID(String clientID) {
|
public void setClientID(String clientID) {
|
||||||
this.clientID = clientID;
|
this.clientID = clientID;
|
||||||
|
@ -584,24 +591,15 @@ public final class StompConnection implements RemotingConnection {
|
||||||
manager.sendReply(this, frame, function);
|
manager.sendReply(this, frame, function);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean validateUser(final String login, final String pass, final RemotingConnection connection) {
|
|
||||||
this.valid = manager.validateUser(login, pass, connection);
|
|
||||||
if (valid) {
|
|
||||||
this.login = login;
|
|
||||||
this.passcode = pass;
|
|
||||||
}
|
|
||||||
return valid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CoreMessage createServerMessage() {
|
public CoreMessage createServerMessage() {
|
||||||
return manager.createServerMessage();
|
return manager.createServerMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
public StompSession getSession() throws ActiveMQStompException {
|
public StompSession getSession() throws ActiveMQStompException, ActiveMQSecurityException {
|
||||||
return getSession(null);
|
return getSession(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public StompSession getSession(String txID) throws ActiveMQStompException {
|
public StompSession getSession(String txID) throws ActiveMQStompException, ActiveMQSecurityException {
|
||||||
StompSession session = null;
|
StompSession session = null;
|
||||||
try {
|
try {
|
||||||
if (txID == null) {
|
if (txID == null) {
|
||||||
|
@ -609,6 +607,8 @@ public final class StompConnection implements RemotingConnection {
|
||||||
} else {
|
} else {
|
||||||
session = manager.getTransactedSession(this, txID);
|
session = manager.getTransactedSession(this, txID);
|
||||||
}
|
}
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw BUNDLE.errorGetSession(e).setHandler(frameHandler);
|
throw BUNDLE.errorGetSession(e).setHandler(frameHandler);
|
||||||
}
|
}
|
||||||
|
@ -623,15 +623,15 @@ public final class StompConnection implements RemotingConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
|
protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
|
||||||
StompSession stompSession = getSession(txID);
|
|
||||||
|
|
||||||
if (stompSession.isNoLocal()) {
|
|
||||||
message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
|
|
||||||
}
|
|
||||||
if (isEnableMessageID()) {
|
|
||||||
message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID());
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
|
StompSession stompSession = getSession(txID);
|
||||||
|
|
||||||
|
if (stompSession.isNoLocal()) {
|
||||||
|
message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
|
||||||
|
}
|
||||||
|
if (isEnableMessageID()) {
|
||||||
|
message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID());
|
||||||
|
}
|
||||||
if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
|
if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
|
||||||
stompSession.sendInternal(message, false);
|
stompSession.sendInternal(message, false);
|
||||||
} else {
|
} else {
|
||||||
|
@ -778,6 +778,14 @@ public final class StompConnection implements RemotingConnection {
|
||||||
stompListener.replySent(frame);
|
stompListener.replySent(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
|
||||||
|
String message = "no message header";
|
||||||
|
if (frame.hasHeader(Stomp.Headers.Error.MESSAGE)) {
|
||||||
|
message = frame.getHeader(Stomp.Headers.Error.MESSAGE);
|
||||||
|
}
|
||||||
|
ActiveMQStompProtocolLogger.LOGGER.sentErrorToClient(getTransportConnection().getRemoteAddress(), message);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public VersionedStompFrameHandler getFrameHandler() {
|
public VersionedStompFrameHandler getFrameHandler() {
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
@ -45,9 +44,6 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager2;
|
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3;
|
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
||||||
|
@ -65,7 +61,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
||||||
|
|
||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
|
|
||||||
private final Map<String, StompSession> transactedSessions = new HashMap<>();
|
private final Map<Object, StompSession> transactedSessions = new HashMap<>();
|
||||||
|
|
||||||
// key => connection ID, value => Stomp session
|
// key => connection ID, value => Stomp session
|
||||||
private final Map<Object, StompSession> sessions = new HashMap<>();
|
private final Map<Object, StompSession> sessions = new HashMap<>();
|
||||||
|
@ -212,33 +208,22 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Package protected ---------------------------------------------
|
|
||||||
|
|
||||||
// Protected -----------------------------------------------------
|
|
||||||
|
|
||||||
// Private -------------------------------------------------------
|
|
||||||
|
|
||||||
public StompSession getSession(StompConnection connection) throws Exception {
|
public StompSession getSession(StompConnection connection) throws Exception {
|
||||||
StompSession stompSession = sessions.get(connection.getID());
|
return internalGetSession(connection, sessions, connection.getID(), false);
|
||||||
if (stompSession == null) {
|
|
||||||
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
|
|
||||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
|
||||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
|
|
||||||
stompSession.setServerSession(session);
|
|
||||||
sessions.put(connection.getID(), stompSession);
|
|
||||||
}
|
|
||||||
server.getStorageManager().setContext(stompSession.getContext());
|
|
||||||
return stompSession;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
|
public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
|
||||||
StompSession stompSession = transactedSessions.get(txID);
|
return internalGetSession(connection, transactedSessions, txID, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
|
||||||
|
StompSession stompSession = sessions.get(id);
|
||||||
if (stompSession == null) {
|
if (stompSession == null) {
|
||||||
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
|
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
|
||||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
|
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
|
||||||
stompSession.setServerSession(session);
|
stompSession.setServerSession(session);
|
||||||
transactedSessions.put(txID, stompSession);
|
sessions.put(id, stompSession);
|
||||||
}
|
}
|
||||||
server.getStorageManager().setContext(stompSession.getContext());
|
server.getStorageManager().setContext(stompSession.getContext());
|
||||||
return stompSession;
|
return stompSession;
|
||||||
|
@ -263,9 +248,9 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
||||||
}
|
}
|
||||||
|
|
||||||
// removed the transacted session belonging to the connection
|
// removed the transacted session belonging to the connection
|
||||||
Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
|
Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
Map.Entry<String, StompSession> entry = iterator.next();
|
Map.Entry<Object, StompSession> entry = iterator.next();
|
||||||
if (entry.getValue().getConnection() == connection) {
|
if (entry.getValue().getConnection() == connection) {
|
||||||
ServerSession serverSession = entry.getValue().getCoreSession();
|
ServerSession serverSession = entry.getValue().getCoreSession();
|
||||||
try {
|
try {
|
||||||
|
@ -326,24 +311,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
||||||
return "activemq";
|
return "activemq";
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean validateUser(String login, String passcode, RemotingConnection remotingConnection) {
|
|
||||||
boolean validated = true;
|
|
||||||
|
|
||||||
ActiveMQSecurityManager sm = server.getSecurityManager();
|
|
||||||
|
|
||||||
if (sm != null && server.getConfiguration().isSecurityEnabled()) {
|
|
||||||
if (sm instanceof ActiveMQSecurityManager3) {
|
|
||||||
validated = ((ActiveMQSecurityManager3) sm).validateUser(login, passcode, remotingConnection) != null;
|
|
||||||
} else if (sm instanceof ActiveMQSecurityManager2) {
|
|
||||||
validated = ((ActiveMQSecurityManager2) sm).validateUser(login, passcode, CertificateUtil.getCertsFromConnection(remotingConnection));
|
|
||||||
} else {
|
|
||||||
validated = sm.validateUser(login, passcode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return validated;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CoreMessage createServerMessage() {
|
public CoreMessage createServerMessage() {
|
||||||
return new CoreMessage(server.getStorageManager().generateID(), 512);
|
return new CoreMessage(server.getStorageManager().generateID(), 512);
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,8 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return null;
|
return null;
|
||||||
} catch (ActiveMQStompException e) {
|
} catch (ActiveMQStompException e) {
|
||||||
return e.getFrame();
|
return e.getFrame();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return new ActiveMQStompException(e.getMessage(), e).setHandler(this).getFrame();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -256,7 +258,7 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQStompException {
|
public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception {
|
||||||
String destination = getDestination(frame);
|
String destination = getDestination(frame);
|
||||||
|
|
||||||
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
|
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
|
||||||
|
@ -279,11 +281,11 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
|
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getDestination(StompFrame request) throws ActiveMQStompException {
|
public String getDestination(StompFrame request) throws Exception {
|
||||||
return getDestination(request, Headers.Subscribe.DESTINATION);
|
return getDestination(request, Headers.Subscribe.DESTINATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getDestination(StompFrame request, String header) throws ActiveMQStompException {
|
public String getDestination(StompFrame request, String header) throws Exception {
|
||||||
String destination = request.getHeader(header);
|
String destination = request.getHeader(header);
|
||||||
if (destination == null) {
|
if (destination == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -291,7 +293,7 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
|
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPrefix(StompFrame request) throws ActiveMQStompException {
|
public String getPrefix(StompFrame request) throws Exception {
|
||||||
String destination = request.getHeader(Headers.Send.DESTINATION);
|
String destination = request.getHeader(Headers.Send.DESTINATION);
|
||||||
if (destination == null) {
|
if (destination == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -367,7 +369,7 @@ public abstract class VersionedStompFrameHandler {
|
||||||
connection.destroy();
|
connection.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException {
|
private RoutingType getRoutingType(String typeHeader, String destination) throws Exception {
|
||||||
// null is valid to return here so we know when the user didn't provide any routing info
|
// null is valid to return here so we know when the user didn't provide any routing info
|
||||||
RoutingType routingType;
|
RoutingType routingType;
|
||||||
if (typeHeader != null) {
|
if (typeHeader != null) {
|
||||||
|
@ -378,4 +380,14 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return routingType;
|
return routingType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected StompFrame getFailedAuthenticationResponse(String login) {
|
||||||
|
StompFrame response;
|
||||||
|
response = createStompFrame(Stomp.Responses.ERROR);
|
||||||
|
response.setNeedsDisconnect(true);
|
||||||
|
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
|
||||||
|
response.setBody(responseText);
|
||||||
|
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
|
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
|
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||||
|
@ -54,32 +55,25 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection.setClientID(clientID);
|
connection.setClientID(clientID);
|
||||||
if (connection.validateUser(login, passcode, connection)) {
|
connection.setLogin(login);
|
||||||
connection.setValid(true);
|
connection.setPasscode(passcode);
|
||||||
|
// Create session which will validate user - this will cache the session in the protocol manager
|
||||||
|
connection.getSession();
|
||||||
|
connection.setValid(true);
|
||||||
|
|
||||||
// Create session after validating user - this will cache the session in the
|
response = new StompFrameV10(Stomp.Responses.CONNECTED);
|
||||||
// protocol manager
|
|
||||||
connection.getSession();
|
|
||||||
|
|
||||||
response = new StompFrameV10(Stomp.Responses.CONNECTED);
|
if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
|
||||||
|
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
|
||||||
if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
|
|
||||||
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
|
|
||||||
|
|
||||||
if (requestID != null) {
|
|
||||||
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// not valid
|
|
||||||
response = new StompFrameV10(Stomp.Responses.ERROR);
|
|
||||||
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
|
|
||||||
response.setBody(responseText);
|
|
||||||
response.setNeedsDisconnect(true);
|
|
||||||
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
|
||||||
|
|
||||||
|
if (requestID != null) {
|
||||||
|
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
|
||||||
|
}
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
response = getFailedAuthenticationResponse(login);
|
||||||
} catch (ActiveMQStompException e) {
|
} catch (ActiveMQStompException e) {
|
||||||
response = e.getFrame();
|
response = e.getFrame();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
|
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
|
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.SimpleBytes;
|
import org.apache.activemq.artemis.core.protocol.stomp.SimpleBytes;
|
||||||
|
@ -69,50 +70,42 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection.setClientID(clientID);
|
connection.setClientID(clientID);
|
||||||
if (connection.validateUser(login, passcode, connection)) {
|
connection.setLogin(login);
|
||||||
connection.setValid(true);
|
connection.setPasscode(passcode);
|
||||||
|
// Create session which will validate user - this will cache the session in the protocol manager
|
||||||
|
connection.getSession();
|
||||||
|
connection.setValid(true);
|
||||||
|
|
||||||
// Create session after validating user - this will cache the session in the
|
response = this.createStompFrame(Stomp.Responses.CONNECTED);
|
||||||
// protocol manager
|
|
||||||
connection.getSession();
|
|
||||||
|
|
||||||
response = this.createStompFrame(Stomp.Responses.CONNECTED);
|
// version
|
||||||
|
response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
|
||||||
|
|
||||||
// version
|
// session
|
||||||
response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
|
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
|
||||||
|
|
||||||
// session
|
// server
|
||||||
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
|
response.addHeader(Stomp.Headers.Connected.SERVER, connection.getActiveMQServerName());
|
||||||
|
|
||||||
// server
|
if (requestID != null) {
|
||||||
response.addHeader(Stomp.Headers.Connected.SERVER, connection.getActiveMQServerName());
|
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
|
||||||
|
|
||||||
if (requestID != null) {
|
|
||||||
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
|
|
||||||
}
|
|
||||||
|
|
||||||
// heart-beat. We need to start after connected frame has been sent.
|
|
||||||
// otherwise the client may receive heart-beat before it receives
|
|
||||||
// connected frame.
|
|
||||||
String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
|
|
||||||
|
|
||||||
if (heartBeat != null) {
|
|
||||||
handleHeartBeat(heartBeat);
|
|
||||||
if (heartBeater == null) {
|
|
||||||
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
|
|
||||||
} else {
|
|
||||||
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.serverPingPeriod + "," + heartBeater.clientPingResponse);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// not valid
|
|
||||||
response = createStompFrame(Stomp.Responses.ERROR);
|
|
||||||
response.setNeedsDisconnect(true);
|
|
||||||
response.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
|
|
||||||
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
|
|
||||||
response.setBody(responseText);
|
|
||||||
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// heart-beat. We need to start after connected frame has been sent.
|
||||||
|
// otherwise the client may receive heart-beat before it receives
|
||||||
|
// connected frame.
|
||||||
|
String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
|
||||||
|
|
||||||
|
if (heartBeat != null) {
|
||||||
|
handleHeartBeat(heartBeat);
|
||||||
|
if (heartBeater == null) {
|
||||||
|
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
|
||||||
|
} else {
|
||||||
|
response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.serverPingPeriod + "," + heartBeater.clientPingResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
response = getFailedAuthenticationResponse(login);
|
||||||
} catch (ActiveMQStompException e) {
|
} catch (ActiveMQStompException e) {
|
||||||
response = e.getFrame();
|
response = e.getFrame();
|
||||||
}
|
}
|
||||||
|
@ -120,6 +113,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected StompFrame getFailedAuthenticationResponse(String login) {
|
||||||
|
StompFrame response = super.getFailedAuthenticationResponse(login);
|
||||||
|
response.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
private void handleHeartBeat(String heartBeatHeader) throws ActiveMQStompException {
|
private void handleHeartBeat(String heartBeatHeader) throws ActiveMQStompException {
|
||||||
String[] params = heartBeatHeader.split(",");
|
String[] params = heartBeatHeader.split(",");
|
||||||
if (params.length != 2) {
|
if (params.length != 2) {
|
||||||
|
|
|
@ -175,7 +175,6 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
response = new CreateSessionResponseMessage(server.getVersion().getIncrementingVersion());
|
response = new CreateSessionResponseMessage(server.getVersion().getIncrementingVersion());
|
||||||
} catch (ActiveMQClusterSecurityException | ActiveMQSecurityException e) {
|
} catch (ActiveMQClusterSecurityException | ActiveMQSecurityException e) {
|
||||||
ActiveMQServerLogger.LOGGER.securityProblemWhileCreatingSession(e.getMessage());
|
|
||||||
response = new ActiveMQExceptionMessage(e);
|
response = new ActiveMQExceptionMessage(e);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
if (e.getType() == ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) {
|
if (e.getType() == ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) {
|
||||||
|
|
|
@ -29,5 +29,7 @@ public interface SecurityStore {
|
||||||
|
|
||||||
boolean isSecurityEnabled();
|
boolean isSecurityEnabled();
|
||||||
|
|
||||||
|
void setSecurityEnabled(boolean securityEnabled);
|
||||||
|
|
||||||
void stop();
|
void stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||||
import org.apache.activemq.artemis.core.security.SecurityStore;
|
import org.apache.activemq.artemis.core.security.SecurityStore;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
|
@ -59,7 +60,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
||||||
|
|
||||||
private volatile long lastCheck;
|
private volatile long lastCheck;
|
||||||
|
|
||||||
private final boolean securityEnabled;
|
private boolean securityEnabled;
|
||||||
|
|
||||||
private final String managementClusterUser;
|
private final String managementClusterUser;
|
||||||
|
|
||||||
|
@ -96,6 +97,11 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
||||||
return securityEnabled;
|
return securityEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setSecurityEnabled(boolean securityEnabled) {
|
||||||
|
this.securityEnabled = securityEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
securityRepository.unRegisterListener(this);
|
securityRepository.unRegisterListener(this);
|
||||||
|
@ -149,7 +155,11 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
||||||
certSubjectDN = certs[0].getSubjectDN().getName();
|
certSubjectDN = certs[0].getSubjectDN().getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
throw ActiveMQMessageBundle.BUNDLE.unableToValidateUser(connection.getRemoteAddress(), user, certSubjectDN);
|
Exception e = ActiveMQMessageBundle.BUNDLE.unableToValidateUser(connection.getRemoteAddress(), user, certSubjectDN);
|
||||||
|
|
||||||
|
ActiveMQServerLogger.LOGGER.securityProblemWhileAuthenticating(e.getMessage());
|
||||||
|
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
return validatedUser;
|
return validatedUser;
|
||||||
|
|
|
@ -1361,8 +1361,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
void negativeGlobalAddressSize(long size);
|
void negativeGlobalAddressSize(long size);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.WARN)
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
@Message(id = 222216, value = "Security problem while creating session: {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222216, value = "Security problem while authenticating: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void securityProblemWhileCreatingSession(String message);
|
void securityProblemWhileAuthenticating(String message);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.WARN)
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
@Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class StompV11Test extends StompTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnection() throws Exception {
|
public void testConnection() throws Exception {
|
||||||
server.getActiveMQServer().getConfiguration().setSecurityEnabled(true);
|
server.getActiveMQServer().getSecurityStore().setSecurityEnabled(true);
|
||||||
StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
|
StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
|
||||||
|
|
||||||
connection.connect(defUser, defPass);
|
connection.connect(defUser, defPass);
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class StompV12Test extends StompTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnection() throws Exception {
|
public void testConnection() throws Exception {
|
||||||
server.getActiveMQServer().getConfiguration().setSecurityEnabled(true);
|
server.getActiveMQServer().getSecurityStore().setSecurityEnabled(true);
|
||||||
StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
|
StompClientConnection connection = StompClientConnectionFactory.createClientConnection(v10Uri);
|
||||||
|
|
||||||
connection.connect(defUser, defPass);
|
connection.connect(defUser, defPass);
|
||||||
|
|
Loading…
Reference in New Issue