This commit is contained in:
Clebert Suconic 2017-06-22 11:51:11 -04:00
commit c2ad9cab0d
3 changed files with 337 additions and 0 deletions

View File

@ -26,8 +26,12 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -71,6 +75,104 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(latch2.getCount(), 0);
sender.close();
receiver.close();
connection.close();
}
private static final String ADDRESS = "address";
private static final String MESSAGE_ID = "messageId";
private static final String CORRELATION_ID = "correlationId";
private static final String MESSAGE_TEXT = "messageText";
private static final String DURABLE = "durable";
private static final String PRIORITY = "priority";
private static final String REPLY_TO = "replyTo";
private static final String TIME_TO_LIVE = "timeToLive";
private boolean checkMessageProperties(AMQPMessage message, Map<String, Object> expectedProperties) {
assertNotNull(message);
assertNotNull(server.getNodeID());
assertNotNull(message.getConnectionID());
assertEquals(message.getAddress(), expectedProperties.get(ADDRESS));
assertEquals(message.isDurable(), expectedProperties.get(DURABLE));
Properties props = message.getProperties();
assertEquals(props.getCorrelationId(), expectedProperties.get(CORRELATION_ID));
assertEquals(props.getReplyTo(), expectedProperties.get(REPLY_TO));
assertEquals(props.getMessageId(), expectedProperties.get(MESSAGE_ID));
Header header = message.getHeader();
assertEquals(header.getDurable(), expectedProperties.get(DURABLE));
assertEquals(header.getTtl().toString(), expectedProperties.get(TIME_TO_LIVE).toString());
assertEquals(header.getPriority().toString(), expectedProperties.get(PRIORITY).toString());
return true;
}
@Test(timeout = 60000)
public void testCheckInterceptedMessageProperties() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final String addressQueue = getTestName();
final String messageId = "lala200";
final String correlationId = "lala-corrId";
final String msgText = "Test intercepted message";
final boolean durableMsg = false;
final short priority = 8;
final long timeToLive = 10000;
final String replyTo = "reply-to-myQueue";
Map<String, Object> expectedProperties = new HashMap<>();
expectedProperties.put(ADDRESS, addressQueue);
expectedProperties.put(MESSAGE_ID, messageId);
expectedProperties.put(CORRELATION_ID, correlationId);
expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(DURABLE, durableMsg);
expectedProperties.put(PRIORITY, priority);
expectedProperties.put(REPLY_TO, replyTo);
expectedProperties.put(TIME_TO_LIVE, timeToLive);
server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
latch.countDown();
return checkMessageProperties(message, expectedProperties);
}
});
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setMessageId(messageId);
message.setCorrelationId(correlationId);
message.setText(msgText);
message.setDurable(durableMsg);
message.setPriority(priority);
message.setReplyToAddress(replyTo);
message.setTimeToLive(timeToLive);
sender.send(message);
assertTrue(latch.await(2, TimeUnit.SECONDS));
final CountDownLatch latch2 = new CountDownLatch(1);
server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
latch2.countDown();
return checkMessageProperties(packet, expectedProperties);
}
});
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(2);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(latch2.getCount(), 0);
sender.close();
receiver.close();
connection.close();
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override
@Before
public void setUp() throws Exception {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
private static final String ADDRESS = "address";
private static final String MESSAGE_TEXT = "messageText";
private static final String RETAINED = "retained";
private boolean checkMessageProperties(MqttMessage message, Map<String, Object> expectedProperties) {
System.out.println("Checking properties in interceptor");
try {
assertNotNull(message);
assertNotNull(server.getNodeID());
MqttFixedHeader header = message.fixedHeader();
assertNotNull(header.messageType());
assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
// TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0)
assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
} catch (Throwable t) {
collector.addError(t);
}
return true;
}
@Rule
public ErrorCollector collector = new ErrorCollector();
@Test(timeout = 60000)
public void testCheckInterceptedMQTTMessageProperties() throws Exception {
final String addressQueue = name.getMethodName();
final String msgText = "Test intercepted message";
final boolean retained = true;
Map<String, Object> expectedProperties = new ArrayMap<>();
expectedProperties.put(ADDRESS, addressQueue);
expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(RETAINED, retained);
final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
initializeConnection(subscribeProvider);
subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(1);
MQTTInterceptor incomingInterceptor = new MQTTInterceptor() {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("incoming");
return checkMessageProperties(packet, expectedProperties);
}
};
MQTTInterceptor outgoingInterceptor = new MQTTInterceptor() {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("outgoing");
return checkMessageProperties(packet, expectedProperties);
}
};
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
byte[] payload = subscribeProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, retained);
latch.await(10, TimeUnit.SECONDS);
subscribeProvider.disconnect();
publishProvider.disconnect();
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class StompTestPropertiesInterceptor extends StompTestBase {
@Override
public List<String> getIncomingInterceptors() {
List<String> stompIncomingInterceptor = new ArrayList<>();
stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestPropertiesInterceptor$StompFramePropertiesInterceptor");
return stompIncomingInterceptor;
}
@Override
public List<String> getOutgoingInterceptors() {
List<String> stompOutgoingInterceptor = new ArrayList<>();
stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestPropertiesInterceptor$StompFramePropertiesInterceptor");
return stompOutgoingInterceptor;
}
public static class StompFramePropertiesInterceptor implements StompFrameInterceptor {
@Override
public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
if (stompFrame.getCommand().equals("CONNECT") || stompFrame.getCommand().equals("CONNECTED")) {
return true;
}
System.out.println("Checking properties in interceptor");
assertNotNull(stompFrame);
assertEquals(stompFrame.getHeader(MY_HEADER), expectedProperties.get(MY_HEADER));
assertEquals(stompFrame.getBody(), expectedProperties.get(MESSAGE_TEXT));
return true;
}
}
private static final String MESSAGE_TEXT = "messageText";
private static final String MY_HEADER = "my-header";
private static Map<String, Object> expectedProperties = new ArrayMap<>();
@Test(timeout = 60000)
public void testCheckInterceptedStompMessageProperties() throws Exception {
final String msgText = "Test intercepted message";
final String myHeader = "TestInterceptedHeader";
expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(MY_HEADER, myHeader);
StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass);
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("subscription-type", "ANYCAST");
subFrame.addHeader("destination", name.getMethodName());
subFrame.addHeader("ack", "auto");
subFrame.addHeader(MY_HEADER, myHeader);
subFrame.setBody(msgText);
conn.sendFrame(subFrame);
ClientStompFrame frame = conn.createFrame("SEND");
frame.addHeader("destination", name.getMethodName());
frame.addHeader("ack", "auto");
frame.addHeader(MY_HEADER, myHeader);
conn.sendFrame(frame);
conn.disconnect();
}
}