NIFI-3672 Add support for strongly typed message properties in PublishJMS

This commit is contained in:
Mike Moser 2018-08-13 17:40:54 +00:00
parent 5106dc0af9
commit 66eeb48802
2 changed files with 139 additions and 4 deletions

View File

@ -16,8 +16,11 @@
*/
package org.apache.nifi.jms.processors;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@ -78,11 +81,14 @@ final class JMSPublisher extends JMSWorker {
void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
Map<String, String> flowFileAttributesToSend = flowFileAttributes.entrySet().stream()
.filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
try {
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
message.setStringProperty(entry.getKey(), entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
@ -110,6 +116,11 @@ final class JMSPublisher extends JMSWorker {
} else {
logUnbuildableDestination(entry.getKey(), JmsHeaders.DESTINATION);
}
} else {
// not a special attribute handled above, so send it as a property using the specified property type
String type = flowFileAttributes.getOrDefault(entry.getKey().concat(".type"), "unknown").toLowerCase();
propertySetterMap.getOrDefault(type, JmsPropertySetterEnum.STRING)
.setProperty(message, entry.getKey(), entry.getValue());
}
} catch (NumberFormatException ne) {
this.processLog.warn("Incompatible value for attribute " + entry.getKey()
@ -146,4 +157,55 @@ final class JMSPublisher extends JMSWorker {
return destination;
}
/**
* Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
*/
public interface JmsPropertySetter {
void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
}
public enum JmsPropertySetterEnum implements JmsPropertySetter {
BOOLEAN( (message, name, value) -> {
message.setBooleanProperty(name, Boolean.parseBoolean(value));
} ),
BYTE( (message, name, value) -> {
message.setByteProperty(name, Byte.parseByte(value));
} ),
SHORT( (message, name, value) -> {
message.setShortProperty(name, Short.parseShort(value));
} ),
INTEGER( (message, name, value) -> {
message.setIntProperty(name, Integer.parseInt(value));
} ),
LONG( (message, name, value) -> {
message.setLongProperty(name, Long.parseLong(value));
} ),
FLOAT( (message, name, value) -> {
message.setFloatProperty(name, Float.parseFloat(value));
} ),
DOUBLE( (message, name, value) -> {
message.setDoubleProperty(name, Double.parseDouble(value));
} ),
STRING( (message, name, value) -> {
message.setStringProperty(name, value);
} );
private final JmsPropertySetter setter;
JmsPropertySetterEnum(JmsPropertySetter setter) {
this.setter = setter;
}
public void setProperty(Message message, String name, String value) throws JMSException, NumberFormatException {
setter.setProperty(message, name, value);
}
}
/**
* This map helps us avoid using JmsPropertySetterEnum.valueOf and dealing with IllegalArgumentException on failed lookup.
*/
public static Map<String, JmsPropertySetterEnum> propertySetterMap = new HashMap<>();
static {
Arrays.stream(JmsPropertySetterEnum.values()).forEach(e -> propertySetterMap.put(e.name().toLowerCase(), e));
}
}

View File

@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -180,4 +181,76 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
@Test(timeout = 10000)
public void validatePublishPropertyTypes() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "validatePublishPropertyTypes";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put("myboolean", "true");
attributes.put("myboolean.type", "boolean");
attributes.put("mybyte", "127");
attributes.put("mybyte.type", "byte");
attributes.put("myshort", "16384");
attributes.put("myshort.type", "short");
attributes.put("myinteger", "1544000");
attributes.put("myinteger.type", "INTEGER"); // test upper case
attributes.put("mylong", "9876543210");
attributes.put("mylong.type", "long");
attributes.put("myfloat", "3.14");
attributes.put("myfloat.type", "float");
attributes.put("mydouble", "3.14159265359");
attributes.put("mydouble.type", "double");
attributes.put("badtype", "3.14");
attributes.put("badtype.type", "pi"); // pi not recognized as a type, so send as String
attributes.put("badint", "3.14"); // value is not an integer
attributes.put("badint.type", "integer");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals(true, message.getObjectProperty("foo") instanceof String);
assertEquals("foo", message.getStringProperty("foo"));
assertEquals(true, message.getObjectProperty("myboolean") instanceof Boolean);
assertEquals(true, message.getBooleanProperty("myboolean"));
assertEquals(true, message.getObjectProperty("mybyte") instanceof Byte);
assertEquals(127, message.getByteProperty("mybyte"));
assertEquals(true, message.getObjectProperty("myshort") instanceof Short);
assertEquals(16384, message.getShortProperty("myshort"));
assertEquals(true, message.getObjectProperty("myinteger") instanceof Integer);
assertEquals(1544000, message.getIntProperty("myinteger"));
assertEquals(true, message.getObjectProperty("mylong") instanceof Long);
assertEquals(9876543210L, message.getLongProperty("mylong"));
assertEquals(true, message.getObjectProperty("myfloat") instanceof Float);
assertEquals(3.14F, message.getFloatProperty("myfloat"), 0.001F);
assertEquals(true, message.getObjectProperty("mydouble") instanceof Double);
assertEquals(3.14159265359D, message.getDoubleProperty("mydouble"), 0.00000000001D);
assertEquals(true, message.getObjectProperty("badtype") instanceof String);
assertEquals("3.14", message.getStringProperty("badtype"));
assertFalse(message.propertyExists("badint"));
runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
}