ARTEMIS-2974 audit logger can print wrong user info

Using a ThreadLocal for the audit user information works in most cases,
but it can fail when dispatching messages to consumers because threads
are taken out of a pool to do the dispatching and those threads may not
be associated with the proper credentials. This commit fixes that
problem with the following changes:

 - Passes the Subject explicitly when logging audit info during dispatch
 - Relocates security audit logging from the SecurityManager
implementation(s) to the SecurityStore implementation
 - Associates the Subject with the connection properly with the new
security caching
This commit is contained in:
Justin Bertram 2020-11-02 19:13:38 -06:00 committed by Clebert Suconic
parent c377801150
commit ecead9b130
8 changed files with 278 additions and 25 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.logs;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.regex.Pattern;
import org.jboss.logmanager.ExtHandler;
import org.jboss.logmanager.ExtLogRecord;
@ -120,6 +121,25 @@ public class AssertionLoggerHandler extends ExtHandler {
return false;
}
public static boolean matchText(final String pattern) {
Pattern r = Pattern.compile(pattern);
for (Map.Entry<String, ExtLogRecord> entry : messages.entrySet()) {
if (r.matcher(entry.getKey()).matches()) {
return true;
} else {
Throwable throwable = entry.getValue().getThrown();
if (throwable != null && throwable.getMessage() != null) {
if (r.matcher(throwable.getMessage()).matches()) {
return true;
}
}
}
}
return false;
}
public static final void clear() {
messages.clear();
}

View File

@ -2428,8 +2428,8 @@ public interface AuditLogger extends BasicLogger {
void logCoreSendMessage(String user, String messageToString, Object context);
//hot path log using a different logger
static void coreConsumeMessage(String queue) {
MESSAGE_LOGGER.consumeMessage(getCaller(), queue);
static void coreConsumeMessage(Subject user, String queue) {
MESSAGE_LOGGER.consumeMessage(getCaller(user), queue);
}
@LogMessage(level = Logger.Level.INFO)

View File

@ -156,6 +156,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
boolean userIsValid = false;
boolean check = true;
Subject subject = null;
Pair<Boolean, Subject> cacheEntry = authenticationCache.getIfPresent(createAuthenticationCacheKey(user, password, connection));
if (cacheEntry != null) {
if (!cacheEntry.getA()) {
@ -165,13 +166,13 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
// cached authentication succeeded previously so don't check again
check = false;
userIsValid = true;
validatedUser = getUserFromSubject(cacheEntry.getB());
subject = cacheEntry.getB();
validatedUser = getUserFromSubject(subject);
}
}
if (check) {
if (securityManager instanceof ActiveMQSecurityManager5) {
Subject subject = ((ActiveMQSecurityManager5) securityManager).authenticate(user, password, connection, securityDomain);
subject = ((ActiveMQSecurityManager5) securityManager).authenticate(user, password, connection, securityDomain);
authenticationCache.put(createAuthenticationCacheKey(user, password, connection), new Pair<>(subject != null, subject));
validatedUser = getUserFromSubject(subject);
} else if (securityManager instanceof ActiveMQSecurityManager4) {
@ -204,9 +205,20 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
ActiveMQServerLogger.LOGGER.securityProblemWhileAuthenticating(e.getMessage());
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.userFailedLoggedInAudit(subject, e.getMessage());
}
throw e;
}
if (AuditLogger.isAnyLoggingEnabled() && connection != null) {
connection.setAuditSubject(subject);
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.userSuccesfullyLoggedInAudit(subject);
}
return validatedUser;
}

View File

@ -487,7 +487,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
Message message = reference.getMessage();
if (AuditLogger.isMessageEnabled()) {
AuditLogger.coreConsumeMessage(getQueueName().toString());
AuditLogger.coreConsumeMessage(session.getRemotingConnection().getAuditSubject(), getQueueName().toString());
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));

View File

@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.User;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
@ -82,12 +81,6 @@ public class ActiveMQBasicSecurityManager implements ActiveMQSecurityManager5, U
for (String role : getRole(userToAuthenticate).getRoles()) {
subject.getPrincipals().add((Principal) SecurityManagerUtil.createGroupPrincipal(role, rolePrincipalClass));
}
if (AuditLogger.isAnyLoggingEnabled() && remotingConnection != null) {
remotingConnection.setAuditSubject(subject);
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.userSuccesfullyLoggedInAudit(subject);
}
return subject;
}
}

View File

@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.jaas.JaasCallbackHandler;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
@ -137,16 +136,7 @@ public class ActiveMQJAASSecurityManager implements ActiveMQSecurityManager5 {
}
try {
lc.login();
if (AuditLogger.isAnyLoggingEnabled() && remotingConnection != null) {
remotingConnection.setAuditSubject(lc.getSubject());
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.userSuccesfullyLoggedInAudit(lc.getSubject());
}
} catch (LoginException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.userFailedLoggedInAudit(lc.getSubject(), e.getMessage());
}
throw e;
}
return lc.getSubject();

View File

@ -17,7 +17,7 @@
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
# Root logger level
logger.level=INFO
@ -41,7 +41,7 @@ logger.handlers=CONSOLE,TEST
# to enable audit change the level to INFO
logger.org.apache.activemq.audit.level=ERROR
logger.org.apache.activemq.audit.handlers=CONSOLE,FILE
logger.org.apache.activemq.audit.handlers=CONSOLE,FILE,TEST
logger.org.apache.activemq.audit.useParentHandlers=false
# Console handler configuration

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.security.ActiveMQBasicSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.logmanager.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class MultiThreadedAuditLoggingTest extends ActiveMQTestBase {
protected ActiveMQServer server;
private static final int MESSAGE_COUNT = 10;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultInVMConfig().setSecurityEnabled(true));
server.setSecurityManager(new ActiveMQBasicSecurityManager());
server.start();
Set<Role> roles = new HashSet<>();
roles.add(new Role("queue1", true, true, true, true, true, true, true, true, true, true));
server.getSecurityRepository().addMatch("queue1", roles);
roles = new HashSet<>();
roles.add(new Role("queue2", true, true, true, true, true, true, true, true, true, true));
server.getSecurityRepository().addMatch("queue2", roles);
server.getActiveMQServerControl().addUser("queue1", "queue1", "queue1", true);
server.getActiveMQServerControl().addUser("queue2", "queue2", "queue2", true);
}
private static final Logger logManager = org.jboss.logmanager.Logger.getLogger("org.apache.activemq.audit.message");
private static java.util.logging.Level previousLevel = logManager.getLevel();
@BeforeClass
public static void prepareLogger() {
logManager.setLevel(Level.INFO);
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
logManager.setLevel(previousLevel);
}
class SomeConsumer extends Thread {
boolean failed = false;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
String queue;
SomeConsumer(String queue) throws Exception {
this.queue = queue;
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(queue, queue, false, true, true, false, 0));
}
@Override
public void run() {
ClientConsumer consumer = null;
try {
try {
session.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
} catch (Exception e) {
// ignore
}
consumer = session.createConsumer(queue);
session.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
consumer.receive();
}
} catch (Throwable e) {
failed = true;
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (sf != null) {
sf.close();
}
if (locator != null) {
locator.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class SomeProducer extends Thread {
boolean failed = false;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
String queue;
SomeProducer(String queue) throws Exception {
this.queue = queue;
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(queue, queue, false, true, true, false, 0));
}
@Override
public void run() {
final String data = "Simple Text " + UUID.randomUUID().toString();
ClientProducer producer = null;
try {
try {
session.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
} catch (Exception e) {
// ignore
}
producer = session.createProducer(queue);
ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeString(data);
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.send(message);
}
} catch (Throwable e) {
failed = true;
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (sf != null) {
sf.close();
}
if (locator != null) {
locator.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Test
public void testConcurrentLogging() throws Exception {
int nThreads = 6;
SomeConsumer[] consumers = new SomeConsumer[nThreads];
SomeProducer[] producers = new SomeProducer[nThreads];
for (int j = 0; j < 8; j++) {
for (int i = 0; i < nThreads / 2; i++) {
consumers[i] = new SomeConsumer("queue1");
}
for (int i = nThreads / 2; i < nThreads; i++) {
consumers[i] = new SomeConsumer("queue2");
}
for (int i = 0; i < nThreads; i++) {
consumers[i].start();
}
for (int i = 0; i < nThreads / 2; i++) {
producers[i] = new SomeProducer("queue1");
}
for (int i = nThreads / 2; i < nThreads; i++) {
producers[i] = new SomeProducer("queue2");
}
for (int i = 0; i < nThreads; i++) {
producers[i].start();
}
for (SomeConsumer consumer : consumers) {
consumer.join();
Assert.assertFalse(consumer.failed);
}
for (SomeProducer producer : producers) {
producer.join();
Assert.assertFalse(producer.failed);
}
assertFalse(AssertionLoggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue2"));
assertFalse(AssertionLoggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue1"));
assertTrue(AssertionLoggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue2"));
assertTrue(AssertionLoggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue1"));
}
}
}