This closes #2289
This commit is contained in:
Clebert Suconic 2018-09-04 12:54:21 -04:00
commit 2f63260680
2 changed files with 47 additions and 0 deletions

View File

@ -50,6 +50,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
super(transportConnection, executor);
this.manager = manager;
this.amqpConnection = amqpConnection;
transportConnection.setProtocolConnection(this);
}
public Executor getExecutor() {

View File

@ -176,4 +176,50 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testCheckRemotingConnection() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final boolean[] passed = {false};
server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
passed[0] = connection != null;
latch.countDown();
return true;
}
});
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
assertTrue(latch.await(2, TimeUnit.SECONDS));
assertTrue("connection not set", passed[0]);
final CountDownLatch latch2 = new CountDownLatch(1);
server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
passed[0] = connection != null;
latch2.countDown();
return true;
}
});
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(latch2.getCount(), 0);
assertTrue("connection not set", passed[0]);
sender.close();
receiver.close();
connection.close();
}
}