Merge pull request #3160 from gtully/ARTEMIS-2788

ARTEMIS-2788 clear openwire producer state on produce close event
This commit is contained in:
Gary Tully 2020-06-03 14:57:22 +01:00 committed by GitHub
commit 4eb3fe15f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 1 deletions

View File

@ -1204,6 +1204,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRemoveProducer(ProducerId id) throws Exception {
ConnectionState cs = getState();
if (cs != null) {
SessionState ss = cs.getSessionState(id.getParentId());
if (ss != null) {
ss.removeProducer(id);
}
}
return null;
}

View File

@ -20,12 +20,17 @@ import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.SessionState;
import org.junit.AfterClass;
import static org.junit.Assert.assertNotNull;
import org.junit.BeforeClass;
import org.junit.Test;
@ -81,4 +86,32 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
@Test
public void testProducerState() throws Exception {
try (Connection conn = factory.createConnection()) {
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
for (int i = 0; i < 10; i++) {
MessageProducer messageProducer = session.createProducer(dest);
messageProducer.close();
}
// verify no trace of producer on the broker
for (RemotingConnection remotingConnection : server.getRemotingService().getConnections()) {
if (remotingConnection instanceof OpenWireConnection) {
OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
ConnectionState connectionState = openWireConnection.getState();
if (connectionState != null) {
for (SessionState sessionState : connectionState.getSessionStates()) {
assertTrue("no producer states leaked", Wait.waitFor(() -> sessionState.getProducerIds().isEmpty()));
}
}
}
}
}
}
}