diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPlugin.java new file mode 100644 index 0000000000..4761ec5680 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPlugin.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.plugin.impl; + +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq.artemis.core.remoting.server.RemotingService; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionPeriodicExpiryPlugin implements ActiveMQServerBasePlugin { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private String name; + private long periodSeconds; + private int accuracyWindowSeconds; + private String acceptorMatchRegex; + + private ScheduledExecutorService executor; + private RemotingService remotingService; + private Pattern matchPattern; + private ScheduledFuture task; + + public ConnectionPeriodicExpiryPlugin() { + periodSeconds = TimeUnit.MINUTES.toSeconds(15); + accuracyWindowSeconds = 30; + } + + @Override + public void registered(ActiveMQServer server) { + + sanityCheckConfig(); + + executor = server.getScheduledPool(); + remotingService = server.getRemotingService(); + matchPattern = Pattern.compile(acceptorMatchRegex); + + task = executor.scheduleWithFixedDelay(() -> { + try { + final long currentTime = System.currentTimeMillis(); + for (Acceptor acceptor : remotingService.getAcceptors().values()) { + if (matchPattern.matcher(acceptor.getName()).matches()) { + if (acceptor instanceof NettyAcceptor) { + NettyAcceptor nettyAcceptor = (NettyAcceptor) acceptor; + + for (NettyServerConnection nettyServerConnection : nettyAcceptor.getConnections().values()) { + RemotingConnection remotingConnection = remotingService.getConnection(nettyServerConnection.getID()); + if (remotingConnection != null && currentTime > remotingConnection.getCreationTime() + periodSeconds) { + executor.schedule(() -> { + remotingService.removeConnection(remotingConnection.getID()); + remotingConnection.fail(new ActiveMQDisconnectedException("terminated by session expiry plugin")); + }, RandomUtil.randomMax(accuracyWindowSeconds), TimeUnit.SECONDS); + } + } + } + } + } + } catch (Exception trapToStayScheduled) { + logger.debug("error on connection expiry plugin scheduled task, will retry", trapToStayScheduled); + } + }, accuracyWindowSeconds, accuracyWindowSeconds, TimeUnit.SECONDS); + } + + @Override + public void unregistered(ActiveMQServer server) { + if (task != null) { + task.cancel(true); + } + } + + @Override + public void init(Map properties) { + name = properties.getOrDefault("name", name); + periodSeconds = Long.parseLong(properties.getOrDefault("periodSeconds", Long.toString(periodSeconds))); + accuracyWindowSeconds = Integer.parseInt(properties.getOrDefault("accuracyWindowSeconds", Long.toString(accuracyWindowSeconds))); + acceptorMatchRegex = properties.getOrDefault("acceptorMatchRegex", acceptorMatchRegex); + + sanityCheckConfig(); + } + + private void sanityCheckConfig() { + if (accuracyWindowSeconds <= 0) { + throw new IllegalArgumentException("accuracyWindowSeconds must be > 0"); + } + + if (acceptorMatchRegex == null) { + throw new IllegalArgumentException("acceptorMatchRegex must be configured"); + } + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getPeriodSeconds() { + return periodSeconds; + } + + public void setPeriodSeconds(long periodSeconds) { + this.periodSeconds = periodSeconds; + } + + public int getAccuracyWindowSeconds() { + return accuracyWindowSeconds; + } + + public void setAccuracyWindowSeconds(int accuracyWindowSeconds) { + this.accuracyWindowSeconds = accuracyWindowSeconds; + } + + public void setAcceptorMatchRegex(String acceptorMatchRegex) { + this.acceptorMatchRegex = acceptorMatchRegex; + } + + public String getAcceptorMatchRegex() { + return acceptorMatchRegex; + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 2cbf1a516d..9a4bb9b8b0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; +import org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin; import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.routing.KeyType; import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; @@ -2547,6 +2548,48 @@ public class ConfigurationImplTest extends ServerTestBase { Assert.assertTrue(configuration.getStatus().contains("Unknown property 'name'")); } + @Test + public void testPropsAndNamePlugin() throws Exception { + + final ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties(); + + insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".periodSeconds", "30"); + insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".acceptorMatchRegex", "netty-.*"); + insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".periodSeconds", "30"); + insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".accuracyWindowSeconds", "10"); + + + configuration.parsePrefixedProperties(insertionOrderedProperties, null); + + Assert.assertTrue(configuration.getStatus(), configuration.getStatus().contains("\"errors\":[]")); + + Assert.assertEquals(1, configuration.getBrokerPlugins().size()); + Assert.assertEquals(30, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getPeriodSeconds()); + Assert.assertEquals(10, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAccuracyWindowSeconds()); + Assert.assertEquals("netty-.*", ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAcceptorMatchRegex()); + } + + @Test + public void testConnectionExpiryPluginInit() throws Exception { + + final ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties(); + + insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin.class\".init", "acceptorMatchRegex=netty-.*,periodSeconds=30,accuracyWindowSeconds=10"); + + configuration.parsePrefixedProperties(insertionOrderedProperties, null); + + Assert.assertTrue(configuration.getStatus(), configuration.getStatus().contains("\"errors\":[]")); + + Assert.assertEquals(1, configuration.getBrokerPlugins().size()); + Assert.assertEquals(30, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getPeriodSeconds()); + Assert.assertEquals(10, ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAccuracyWindowSeconds()); + Assert.assertEquals("netty-.*", ((ConnectionPeriodicExpiryPlugin)(configuration.getBrokerPlugins().get(0))).getAcceptorMatchRegex()); + } + @Test public void testSecuritySettingPluginFromBrokerProperties() throws Exception { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPluginTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPluginTest.java new file mode 100644 index 0000000000..0351b2d212 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/plugin/impl/ConnectionPeriodicExpiryPluginTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.plugin.impl; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq.artemis.core.remoting.server.RemotingService; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.logging.log4j.core.util.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ConnectionPeriodicExpiryPluginTest { + + ConnectionPeriodicExpiryPlugin underTest; + + @Before + public void initUnderTest() { + underTest = new ConnectionPeriodicExpiryPlugin(); + } + + @Test + public void init() { + Map props = new HashMap<>(); + props.put("name", "joe"); + props.put("periodSeconds", "4"); + props.put("accuracyWindowSeconds", "2"); + props.put("acceptorMatchRegex", "rex"); + + underTest.init(props); + + assertEquals("joe", underTest.getName()); + assertEquals(4, underTest.getPeriodSeconds()); + assertEquals(2, underTest.getAccuracyWindowSeconds()); + assertEquals("rex", underTest.getAcceptorMatchRegex()); + } + + @Test(expected = IllegalArgumentException.class) + public void initError() { + Map props = new HashMap<>(); + props.put("accuracyWindowSeconds", "-2"); + + underTest.init(props); + } + + + @Test(expected = IllegalArgumentException.class) + public void initErrorAcceptorMatchRegex() { + Map props = new HashMap<>(); + props.put("accuracyWindowSeconds", "2"); + + underTest.init(props); + } + + @Test(expected = IllegalArgumentException.class) + public void testRegisterThrowsOnConfigError() { + underTest.registered(null); + } + + @Test + public void name() { + assertNull(underTest.getName()); + underTest.setName("p"); + assertEquals("p", underTest.getName()); + } + + @Test + public void periodSeconds() { + underTest.setPeriodSeconds(4); + assertEquals(4, underTest.getPeriodSeconds()); + } + + @Test + public void accuracyWindowSeconds() { + underTest.setAccuracyWindowSeconds(2); + assertEquals(2, underTest.getAccuracyWindowSeconds()); + } + + @Test + public void acceptorMatchRegex() { + underTest.setAcceptorMatchRegex("rex"); + assertEquals("rex", underTest.getAcceptorMatchRegex()); + } + + + @Test + public void testRescheduleOnErrorAcceptorNullName() { + + underTest.setAcceptorMatchRegex(".*"); + underTest.setAccuracyWindowSeconds(1); + underTest.setPeriodSeconds(1); + + List exceptions = new LinkedList<>(); + + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + // hack a scheduler to call twice in the caller thread + ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + try { + runnable.run(); + runnable.run(); + } catch (Exception oops) { + exceptions.add(oops); + } + return null; + }); + Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService); + + RemotingService remotingService = Mockito.mock(RemotingService.class); + Mockito.when(server.getRemotingService()).thenReturn(remotingService); + + Map acceptors = new HashMap<>(); + // getName returns null + NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class); + acceptors.put("a", acceptor); + Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors); + + underTest.registered(server); + + Assert.isEmpty(exceptions); + Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(remotingService, Mockito.times(2)).getAcceptors(); + } + + @Test + public void testRescheduleOnErrorAcceptorConnectionNotFound() { + + underTest.setAcceptorMatchRegex(".*"); + underTest.setAccuracyWindowSeconds(1); + underTest.setPeriodSeconds(1); + + List exceptions = new LinkedList<>(); + + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + // hack a scheduler to call twice in the caller thread + ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + try { + runnable.run(); + runnable.run(); + } catch (Exception oops) { + exceptions.add(oops); + } + return null; + }); + Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService); + + RemotingService remotingService = Mockito.mock(RemotingService.class); + Mockito.when(server.getRemotingService()).thenReturn(remotingService); + + final String id = "a"; + Map acceptors = new HashMap<>(); + NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class); + Mockito.when(acceptor.getName()).thenReturn(id); + Map connections = new HashMap<>(); + NettyServerConnection connection = Mockito.mock(NettyServerConnection.class); + Mockito.when(connection.getID()).thenReturn(id); + connections.put(id, connection); + Mockito.when(acceptor.getConnections()).thenReturn(connections); + acceptors.put(id, acceptor); + Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors); + + // connection already closed + Mockito.when(remotingService.getConnection(Mockito.eq(id))).thenReturn(null); + + underTest.registered(server); + + Assert.isEmpty(exceptions); + Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(remotingService, Mockito.times(2)).getAcceptors(); + Mockito.verify(remotingService, Mockito.times(2)).getConnection(Mockito.eq(id)); + } + + @Test + public void testFailExpired() { + + underTest.setAcceptorMatchRegex(".*"); + underTest.setAccuracyWindowSeconds(1); + underTest.setPeriodSeconds(1); + + List exceptions = new LinkedList<>(); + + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + // hack a scheduler to run in the caller thread + ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(scheduledExecutorService.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + try { + runnable.run(); + } catch (Exception oops) { + exceptions.add(oops); + } + return null; + }); + + Mockito.when(scheduledExecutorService.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any())).thenAnswer(invocationOnMock -> { + Runnable runnable = invocationOnMock.getArgument(0); + try { + runnable.run(); + } catch (Exception oops) { + exceptions.add(oops); + } + return null; + }); + + + Mockito.when(server.getScheduledPool()).thenReturn(scheduledExecutorService); + + RemotingService remotingService = Mockito.mock(RemotingService.class); + Mockito.when(server.getRemotingService()).thenReturn(remotingService); + + final String id = "a"; + Map acceptors = new HashMap<>(); + NettyAcceptor acceptor = Mockito.mock(NettyAcceptor.class); + Mockito.when(acceptor.getName()).thenReturn(id); + Map connections = new HashMap<>(); + NettyServerConnection connection = Mockito.mock(NettyServerConnection.class); + Mockito.when(connection.getID()).thenReturn(id); + connections.put(id, connection); + Mockito.when(acceptor.getConnections()).thenReturn(connections); + acceptors.put(id, acceptor); + Mockito.when(remotingService.getAcceptors()).thenReturn(acceptors); + + RemotingConnection remotingConnection = Mockito.mock(RemotingConnection.class); + Mockito.when(remotingConnection.getID()).thenReturn(id); + Mockito.when(remotingConnection.getCreationTime()).thenReturn(System.currentTimeMillis() - 20000); + + Mockito.when(remotingService.getConnection(Mockito.eq(id))).thenReturn(remotingConnection); + + underTest.registered(server); + + Assert.isEmpty(exceptions); + Mockito.verify(scheduledExecutorService, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any()); + Mockito.verify(remotingService, Mockito.times(1)).getAcceptors(); + Mockito.verify(remotingService, Mockito.times(1)).getConnection(Mockito.eq(id)); + Mockito.verify(remotingService, Mockito.times(1)).removeConnection(Mockito.eq(id)); + Mockito.verify(remotingConnection, Mockito.times(1)).fail(Mockito.any()); + } +} \ No newline at end of file diff --git a/docs/user-manual/broker-plugins.adoc b/docs/user-manual/broker-plugins.adoc index 8bc2c09cf8..afc0202165 100644 --- a/docs/user-manual/broker-plugins.adoc +++ b/docs/user-manual/broker-plugins.adoc @@ -178,3 +178,30 @@ In the example below `ROLE_PROPERTY` is set to `permissions` when that property ---- + +== Using the ConnectionPeriodicExpiryPlugin + +The `ConnectionPeriodicExpiryPlugin` will implement a global expiry (and disconnect) for connections that live longer than `periodSeconds` on a matching acceptor basis. + +This plugin can be useful when credential rotation or credential validation must be enforced at regular intervals as authentication will be enforced on reconnect. + +The plugin requires the configuration of the `acceptorMatchRegex` to determine the acceptors to monitor. It is typical to separate client acceptors and federation or cluster acceptors such that only client connections will be subject to periodic expiry. The `acceptorMatchRegex` must be configured to match the name of the acceptor(s) whose connections will be subject to periodic expiry. + +|=== +| Property | Property Description | Default Value + +|`acceptorMatchRegex`|the regular expression used to match against the names of acceptors to monitor | +|`periodSeconds`|the max period, in seconds, that a connection can last | 900 seconds (15 minutes) +|`accuracyWindowSeconds`|determines how often we check connections for expiry and also provides an upper bound on the random seconds we use to schedule a disconnect. Using a random second will potentially avoid many reconnects occurring at the exact same time. It must be positive value > 0|30 seconds + +|=== + +The plugin can be configured via xml in the normal broker-plugin way: +[,xml] +---- + + + + + +---- \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/ConnectionPeriodicExpiryPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/ConnectionPeriodicExpiryPluginTest.java new file mode 100644 index 0000000000..3e9cc4ae93 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/ConnectionPeriodicExpiryPluginTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.plugin.impl.ConnectionPeriodicExpiryPlugin; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionPeriodicExpiryPluginTest extends MultiprotocolJMSClientTestSupport { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + protected void addConfiguration(ActiveMQServer server) throws Exception { + + ConnectionPeriodicExpiryPlugin plugin = new ConnectionPeriodicExpiryPlugin(); + plugin.setPeriodSeconds(1); + plugin.setAccuracyWindowSeconds(1); + plugin.setAcceptorMatchRegex("netty-acceptor"); + server.getConfiguration().getBrokerPlugins().add(plugin); + } + + @Test(timeout = 5000) + public void testAMQP() throws Exception { + Connection connection = createConnection(); //AMQP + testExpiry(connection); + } + + @Test(timeout = 5000) + public void testCore() throws Exception { + Connection connection = createCoreConnection(); + testExpiry(connection); + } + + @Test(timeout = 5000) + public void testOpenWire() throws Exception { + Connection connection = createOpenWireConnection(); + testExpiry(connection); + } + + private void testExpiry(Connection connection) throws JMSException { + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + final MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(message -> { + // don't care + }); + + final CountDownLatch gotExpired = new CountDownLatch(1); + connection.setExceptionListener(exception -> { + gotExpired.countDown(); }); + + Wait.assertTrue(() -> gotExpired.await(100, TimeUnit.MILLISECONDS), 2000, 100); + + } finally { + try { + connection.close(); + } catch (Exception expected) { + // potential error on already disconnected + } + } + } +}