NO-JIRA fix intermittently failing STOMP test

This commit is contained in:
Justin Bertram 2018-03-01 11:21:52 -06:00
parent 2123f85ea9
commit 5773ad1ea7
1 changed files with 39 additions and 49 deletions

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.tests.integration.stomp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.activemq.artemis.api.core.Interceptor;
@ -25,7 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@ -38,8 +38,8 @@ public class StompWithInterceptorsTest extends StompTestBase {
@Override
public List<String> getIncomingInterceptors() {
List<String> stompIncomingInterceptor = new ArrayList<>();
stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$MyIncomingStompFrameInterceptor");
stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$MyCoreInterceptor");
stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$IncomingStompInterceptor");
stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$CoreInterceptor");
return stompIncomingInterceptor;
}
@ -47,20 +47,25 @@ public class StompWithInterceptorsTest extends StompTestBase {
@Override
public List<String> getOutgoingInterceptors() {
List<String> stompOutgoingInterceptor = new ArrayList<>();
stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$MyOutgoingStompFrameInterceptor");
stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$OutgoingStompInterceptor");
return stompOutgoingInterceptor;
}
@Test
public void stompFrameInterceptor() throws Exception {
MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
Thread.sleep(200);
// So we clear them here
MyCoreInterceptor.incomingInterceptedFrames.clear();
IncomingStompInterceptor.interceptedFrames.clear();
OutgoingStompInterceptor.interceptedFrames.clear();
// wait for the SESS_START which is the last packet for the test's JMS connection
assertTrue(Wait.waitFor(() -> {
for (Packet packet : new ArrayList<>(CoreInterceptor.incomingInterceptedFrames)) {
if (packet.getType() == (byte) 67) {
return true;
}
}
return false;
}, 2000, 50));
CoreInterceptor.incomingInterceptedFrames.clear();
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass);
@ -71,11 +76,11 @@ public class StompWithInterceptorsTest extends StompTestBase {
subFrame.addHeader("ack", "auto");
conn.sendFrame(subFrame);
assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
assertEquals(0, CoreInterceptor.incomingInterceptedFrames.size());
sendJmsMessage(getName());
// Something was supposed to be called on sendMessages
assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
assertTrue("core interceptor is not working", CoreInterceptor.incomingInterceptedFrames.size() > 0);
conn.receiveFrame(10000);
@ -84,83 +89,68 @@ public class StompWithInterceptorsTest extends StompTestBase {
frame.setBody("Hello World");
conn.sendFrame(frame);
assertTrue(Wait.waitFor(() -> OutgoingStompInterceptor.interceptedFrames.size() == 3, 2000, 50));
conn.disconnect();
assertTrue(Wait.waitFor(() -> IncomingStompInterceptor.interceptedFrames.size() == 4, 2000, 50));
List<String> incomingCommands = new ArrayList<>(4);
incomingCommands.add("CONNECT");
incomingCommands.add("SUBSCRIBE");
incomingCommands.add("SEND");
incomingCommands.add("DISCONNECT");
for (int i = 0; i < IncomingStompInterceptor.interceptedFrames.size(); i++) {
Assert.assertEquals(incomingCommands.get(i), IncomingStompInterceptor.interceptedFrames.get(i).getCommand());
Assert.assertEquals("incomingInterceptedVal", IncomingStompInterceptor.interceptedFrames.get(i).getHeader("incomingInterceptedProp"));
}
List<String> outgoingCommands = new ArrayList<>(3);
outgoingCommands.add("CONNECTED");
outgoingCommands.add("MESSAGE");
outgoingCommands.add("MESSAGE");
long timeout = System.currentTimeMillis() + 1000;
// Things are async, giving some time to things arrive before we actually assert
while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
timeout > System.currentTimeMillis()) {
Thread.sleep(10);
for (int i = 0; i < OutgoingStompInterceptor.interceptedFrames.size(); i++) {
Assert.assertEquals(outgoingCommands.get(i), OutgoingStompInterceptor.interceptedFrames.get(i).getCommand());
}
Wait.waitFor(() -> {
return MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() == 4;
});
Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
Wait.waitFor(() -> {
return MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() == 3;
});
Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) {
Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
}
for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) {
Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
}
Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
Assert.assertEquals("incomingInterceptedVal", OutgoingStompInterceptor.interceptedFrames.get(2).getHeader("incomingInterceptedProp"));
Assert.assertEquals("outgoingInterceptedVal", OutgoingStompInterceptor.interceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
}
public static class MyCoreInterceptor implements Interceptor {
public static class CoreInterceptor implements Interceptor {
static List<Packet> incomingInterceptedFrames = new ArrayList<>();
@Override
public boolean intercept(Packet packet, RemotingConnection connection) {
IntegrationTestLogger.LOGGER.info("Core intercepted: " + packet);
incomingInterceptedFrames.add(packet);
return true;
}
}
public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor {
public static class IncomingStompInterceptor implements StompFrameInterceptor {
static List<StompFrame> incomingInterceptedFrames = new ArrayList<>();
static List<StompFrame> interceptedFrames = Collections.synchronizedList(new ArrayList<>());
@Override
public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
incomingInterceptedFrames.add(stompFrame);
interceptedFrames.add(stompFrame);
stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
return true;
}
}
public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor {
public static class OutgoingStompInterceptor implements StompFrameInterceptor {
static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>();
static List<StompFrame> interceptedFrames = Collections.synchronizedList(new ArrayList<>());
@Override
public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
outgoingInterceptedFrames.add(stompFrame);
interceptedFrames.add(stompFrame);
stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
return true;
}
}
}
}