ARTEMIS-4709 plugin to enforce connection periodic expiry per acceptor

This commit is contained in:
Gary Tully 2024-04-02 11:21:03 +01:00 committed by Justin Bertram
parent a4d1f7084d
commit 20f345dbe1
5 changed files with 576 additions and 0 deletions

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin.impl;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionPeriodicExpiryPlugin implements ActiveMQServerBasePlugin {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String name;
private long periodSeconds;
private int accuracyWindowSeconds;
private String acceptorMatchRegex;
private ScheduledExecutorService executor;
private RemotingService remotingService;
private Pattern matchPattern;
private ScheduledFuture<?> task;
public ConnectionPeriodicExpiryPlugin() {
periodSeconds = TimeUnit.MINUTES.toSeconds(15);
accuracyWindowSeconds = 30;
}
@Override
public void registered(ActiveMQServer server) {
sanityCheckConfig();
executor = server.getScheduledPool();
remotingService = server.getRemotingService();
matchPattern = Pattern.compile(acceptorMatchRegex);
task = executor.scheduleWithFixedDelay(() -> {
try {
final long currentTime = System.currentTimeMillis();
for (Acceptor acceptor : remotingService.getAcceptors().values()) {
if (matchPattern.matcher(acceptor.getName()).matches()) {
if (acceptor instanceof NettyAcceptor) {
NettyAcceptor nettyAcceptor = (NettyAcceptor) acceptor;
for (NettyServerConnection nettyServerConnection : nettyAcceptor.getConnections().values()) {
RemotingConnection remotingConnection = remotingService.getConnection(nettyServerConnection.getID());
if (remotingConnection != null && currentTime > remotingConnection.getCreationTime() + periodSeconds) {
executor.schedule(() -> {
remotingService.removeConnection(remotingConnection.getID());
remotingConnection.fail(new ActiveMQDisconnectedException("terminated by session expiry plugin"));
}, RandomUtil.randomMax(accuracyWindowSeconds), TimeUnit.SECONDS);
}
}
}
}
}
} catch (Exception trapToStayScheduled) {
logger.debug("error on connection expiry plugin scheduled task, will retry", trapToStayScheduled);
}
}, accuracyWindowSeconds, accuracyWindowSeconds, TimeUnit.SECONDS);
}
@Override
public void unregistered(ActiveMQServer server) {
if (task != null) {
task.cancel(true);
}
}
@Override
public void init(Map<String, String> properties) {
name = properties.getOrDefault("name", name);
periodSeconds = Long.parseLong(properties.getOrDefault("periodSeconds", Long.toString(periodSeconds)));
accuracyWindowSeconds = Integer.parseInt(properties.getOrDefault("accuracyWindowSeconds", Long.toString(accuracyWindowSeconds)));
acceptorMatchRegex = properties.getOrDefault("acceptorMatchRegex", acceptorMatchRegex);
sanityCheckConfig();
}
private void sanityCheckConfig() {
if (accuracyWindowSeconds <= 0) {
throw new IllegalArgumentException("accuracyWindowSeconds must be > 0");
}
if (acceptorMatchRegex == null) {
throw new IllegalArgumentException("acceptorMatchRegex must be configured");
}
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getPeriodSeconds() {
return periodSeconds;
}
public void setPeriodSeconds(long periodSeconds) {
this.periodSeconds = periodSeconds;
}
public int getAccuracyWindowSeconds() {
return accuracyWindowSeconds;
}
public void setAccuracyWindowSeconds(int accuracyWindowSeconds) {
this.accuracyWindowSeconds = accuracyWindowSeconds;
}
public void setAcceptorMatchRegex(String acceptorMatchRegex) {
this.acceptorMatchRegex = acceptorMatchRegex;
}
public String getAcceptorMatchRegex() {
return acceptorMatchRegex;
}
}

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
@ -2547,6 +2548,48 @@ public class ConfigurationImplTest extends ServerTestBase {
Assert.assertTrue(configuration.getStatus().contains("Unknown property 'name'"));
}
@Test
public void testPropsAndNamePlugin() throws Exception {
final ConfigurationImpl configuration = new ConfigurationImpl();
Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties();
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".periodSeconds", "30");
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".acceptorMatchRegex", "netty-.*");
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".periodSeconds", "30");
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".accuracyWindowSeconds", "10");
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
Assert.assertTrue(configuration.getStatus(), configuration.getStatus().contains("\"errors\":[]"));
Assert.assertEquals(1, configuration.getBrokerPlugins().size());
Assert.assertEquals(30, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getPeriodSeconds());
Assert.assertEquals(10, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAccuracyWindowSeconds());
Assert.assertEquals("netty-.*", ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAcceptorMatchRegex());
}
@Test
public void testConnectionExpiryPluginInit() throws Exception {
final ConfigurationImpl configuration = new ConfigurationImpl();
Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties();
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".init", "acceptorMatchRegex=netty-.*,periodSeconds=30,accuracyWindowSeconds=10");
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
Assert.assertTrue(configuration.getStatus(), configuration.getStatus().contains("\"errors\":[]"));
Assert.assertEquals(1, configuration.getBrokerPlugins().size());
Assert.assertEquals(30, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getPeriodSeconds());
Assert.assertEquals(10, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAccuracyWindowSeconds());
Assert.assertEquals("netty-.*", ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAcceptorMatchRegex());
}
@Test
public void testSecuritySettingPluginFromBrokerProperties() throws Exception {

View File

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin.impl;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.logging.log4j.core.util.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ConnectionPeriodicExpiryPluginTest {
ConnectionPeriodicExpiryPlugin underTest;
@Before
public void initUnderTest() {
underTest = new ConnectionPeriodicExpiryPlugin();
}
@Test
public void init() {
Map<String, String> props = new HashMap<>();
props.put("name", "joe");
props.put("periodSeconds", "4");
props.put("accuracyWindowSeconds", "2");
props.put("acceptorMatchRegex", "rex");
underTest.init(props);
assertEquals("joe", underTest.getName());
assertEquals(4, underTest.getPeriodSeconds());
assertEquals(2, underTest.getAccuracyWindowSeconds());
assertEquals("rex", underTest.getAcceptorMatchRegex());
}
@Test(expected = IllegalArgumentException.class)
public void initError() {
Map<String, String> props = new HashMap<>();
props.put("accuracyWindowSeconds", "-2");
underTest.init(props);
}
@Test(expected = IllegalArgumentException.class)
public void initErrorAcceptorMatchRegex() {
Map<String, String> props = new HashMap<>();
props.put("accuracyWindowSeconds", "2");
underTest.init(props);
}
@Test(expected = IllegalArgumentException.class)
public void testRegisterThrowsOnConfigError() {
underTest.registered(null);
}
@Test
public void name() {
assertNull(underTest.getName());
underTest.setName("p");
assertEquals("p", underTest.getName());
}
@Test
public void periodSeconds() {
underTest.setPeriodSeconds(4);
assertEquals(4, underTest.getPeriodSeconds());
}
@Test
public void accuracyWindowSeconds() {
underTest.setAccuracyWindowSeconds(2);
assertEquals(2, underTest.getAccuracyWindowSeconds());
}
@Test
public void acceptorMatchRegex() {
underTest.setAcceptorMatchRegex("rex");
assertEquals("rex", underTest.getAcceptorMatchRegex());
}
@Test
public void testRescheduleOnErrorAcceptorNullName() {
underTest.setAcceptorMatchRegex(".*");
underTest.setAccuracyWindowSeconds(1);
underTest.setPeriodSeconds(1);
List<Exception> exceptions = new LinkedList<>();
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
// hack a scheduler to call twice in the caller thread
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
try {
runnable.run();
runnable.run();
} catch (Exception oops) {
exceptions.add(oops);
}
return null;
});
Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService);
RemotingService remotingService = Mockito.mock(RemotingService.class);
Mockito.when(server.getRemotingService()).thenReturn(remotingService);
Map<String, Acceptor> acceptors = new HashMap<>();
// getName returns null
NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class);
acceptors.put("a", acceptor);
Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors);
underTest.registered(server);
Assert.isEmpty(exceptions);
Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any());
Mockito.verify(remotingService, Mockito.times(2)).getAcceptors();
}
@Test
public void testRescheduleOnErrorAcceptorConnectionNotFound() {
underTest.setAcceptorMatchRegex(".*");
underTest.setAccuracyWindowSeconds(1);
underTest.setPeriodSeconds(1);
List<Exception> exceptions = new LinkedList<>();
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
// hack a scheduler to call twice in the caller thread
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
try {
runnable.run();
runnable.run();
} catch (Exception oops) {
exceptions.add(oops);
}
return null;
});
Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService);
RemotingService remotingService = Mockito.mock(RemotingService.class);
Mockito.when(server.getRemotingService()).thenReturn(remotingService);
final String id = "a";
Map<String, Acceptor> acceptors = new HashMap<>();
NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class);
Mockito.when(acceptor.getName()).thenReturn(id);
Map<Object, NettyServerConnection> connections = new HashMap<>();
NettyServerConnection connection = Mockito.mock(NettyServerConnection.class);
Mockito.when(connection.getID()).thenReturn(id);
connections.put(id, connection);
Mockito.when(acceptor.getConnections()).thenReturn(connections);
acceptors.put(id, acceptor);
Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors);
// connection already closed
Mockito.when(remotingService.getConnection(Mockito.eq(id))).thenReturn(null);
underTest.registered(server);
Assert.isEmpty(exceptions);
Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any());
Mockito.verify(remotingService, Mockito.times(2)).getAcceptors();
Mockito.verify(remotingService, Mockito.times(2)).getConnection(Mockito.eq(id));
}
@Test
public void testFailExpired() {
underTest.setAcceptorMatchRegex(".*");
underTest.setAccuracyWindowSeconds(1);
underTest.setPeriodSeconds(1);
List<Exception> exceptions = new LinkedList<>();
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
// hack a scheduler to run in the caller thread
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
try {
runnable.run();
} catch (Exception oops) {
exceptions.add(oops);
}
return null;
});
Mockito.when(scheduledExecutorService.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
try {
runnable.run();
} catch (Exception oops) {
exceptions.add(oops);
}
return null;
});
Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService);
RemotingService remotingService = Mockito.mock(RemotingService.class);
Mockito.when(server.getRemotingService()).thenReturn(remotingService);
final String id = "a";
Map<String, Acceptor> acceptors = new HashMap<>();
NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class);
Mockito.when(acceptor.getName()).thenReturn(id);
Map<Object, NettyServerConnection> connections = new HashMap<>();
NettyServerConnection connection = Mockito.mock(NettyServerConnection.class);
Mockito.when(connection.getID()).thenReturn(id);
connections.put(id, connection);
Mockito.when(acceptor.getConnections()).thenReturn(connections);
acceptors.put(id, acceptor);
Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors);
RemotingConnection remotingConnection = Mockito.mock(RemotingConnection.class);
Mockito.when(remotingConnection.getID()).thenReturn(id);
Mockito.when(remotingConnection.getCreationTime()).thenReturn(System.currentTimeMillis() - 20000);
Mockito.when(remotingService.getConnection(Mockito.eq(id))).thenReturn(remotingConnection);
underTest.registered(server);
Assert.isEmpty(exceptions);
Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any());
Mockito.verify(remotingService, Mockito.times(1)).getAcceptors();
Mockito.verify(remotingService, Mockito.times(1)).getConnection(Mockito.eq(id));
Mockito.verify(remotingService, Mockito.times(1)).removeConnection(Mockito.eq(id));
Mockito.verify(remotingConnection, Mockito.times(1)).fail(Mockito.any());
}
}

View File

@ -178,3 +178,30 @@ In the example below `ROLE_PROPERTY` is set to `permissions` when that property
</broker-plugin>
</broker-plugins>
----
== Using the ConnectionPeriodicExpiryPlugin
The `ConnectionPeriodicExpiryPlugin` will implement a global expiry (and disconnect) for connections that live longer than `periodSeconds` on a matching acceptor basis.
This plugin can be useful when credential rotation or credential validation must be enforced at regular intervals as authentication will be enforced on reconnect.
The plugin requires the configuration of the `acceptorMatchRegex` to determine the acceptors to monitor. It is typical to separate client acceptors and federation or cluster acceptors such that only client connections will be subject to periodic expiry. The `acceptorMatchRegex` must be configured to match the name of the acceptor(s) whose connections will be subject to periodic expiry.
|===
| Property | Property Description | Default Value
|`acceptorMatchRegex`|the regular expression used to match against the names of acceptors to monitor |
|`periodSeconds`|the max period, in seconds, that a connection can last | 900 seconds (15 minutes)
|`accuracyWindowSeconds`|determines how often we check connections for expiry and also provides an upper bound on the random seconds we use to schedule a disconnect. Using a random second will potentially avoid many reconnects occurring at the exact same time. It must be positive value > 0|30 seconds
|===
The plugin can be configured via xml in the normal broker-plugin way:
[,xml]
----
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin">
<property key="acceptorMatchRegex" value="netty-client-acceptor" />
</broker-plugin>
</broker-plugins>
----

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionPeriodicExpiryPluginTest extends MultiprotocolJMSClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected void addConfiguration(ActiveMQServer server) throws Exception {
ConnectionPeriodicExpiryPlugin plugin = new ConnectionPeriodicExpiryPlugin();
plugin.setPeriodSeconds(1);
plugin.setAccuracyWindowSeconds(1);
plugin.setAcceptorMatchRegex("netty-acceptor");
server.getConfiguration().getBrokerPlugins().add(plugin);
}
@Test(timeout = 5000)
public void testAMQP() throws Exception {
Connection connection = createConnection(); //AMQP
testExpiry(connection);
}
@Test(timeout = 5000)
public void testCore() throws Exception {
Connection connection = createCoreConnection();
testExpiry(connection);
}
@Test(timeout = 5000)
public void testOpenWire() throws Exception {
Connection connection = createOpenWireConnection();
testExpiry(connection);
}
private void testExpiry(Connection connection) throws JMSException {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
final MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
// don't care
});
final CountDownLatch gotExpired = new CountDownLatch(1);
connection.setExceptionListener(exception -> {
gotExpired.countDown(); });
Wait.assertTrue(() -> gotExpired.await(100, TimeUnit.MILLISECONDS), 2000, 100);
} finally {
try {
connection.close();
} catch (Exception expected) {
// potential error on already disconnected
}
}
}
}