NIFI-4976: If unable to retrieve message content, warn an error but acknowledge message.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2545.
This commit is contained in:
Mark Payne 2018-03-14 14:04:03 -04:00 committed by Pierre Villard
parent d3f54994a6
commit 867ffdb52e
3 changed files with 30 additions and 37 deletions

View File

@ -32,6 +32,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.jms.processors.MessageBodyToBytesConverter.MessageConversionException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.connection.CachingConnectionFactory;
@ -96,13 +97,22 @@ final class JMSConsumer extends JMSWorker {
if (message != null) {
byte[] messageBody = null;
if (message instanceof TextMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
} else if (message instanceof BytesMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else {
throw new IllegalStateException("Message type other then TextMessage and BytesMessage are "
+ "not supported at the moment");
try {
if (message instanceof TextMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
} else if (message instanceof BytesMessage) {
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else {
processLog.error("Received a JMS Message that was neither a TextMessage nor a BytesMessage [{}]; will skip this message.", new Object[] {message});
acknowledge(message, session);
return null;
}
} catch (final MessageConversionException mce) {
processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
new Object[] {message}, mce);
acknowledge(message, session);
return null;
}
final Map<String, String> messageHeaders = extractMessageHeaders(message);
@ -115,9 +125,7 @@ final class JMSConsumer extends JMSWorker {
// and ACK message *only* after its successful invocation
// and if CLIENT_ACKNOWLEDGE is set.
consumerCallback.accept(response);
if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
acknowledge(message, session);
} finally {
JmsUtils.closeMessageConsumer(msgConsumer);
}
@ -127,6 +135,12 @@ final class JMSConsumer extends JMSWorker {
}, true);
}
private void acknowledge(final Message message, final Session session) throws JMSException {
if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
}
@SuppressWarnings("unchecked")
private Map<String, String> extractMessageProperties(final Message message) {

View File

@ -72,22 +72,14 @@ abstract class MessageBodyToBytesConverter {
}
}
/**
*
*/
private static class BytesMessageInputStream extends InputStream {
private BytesMessage message;
/**
*
*/
public BytesMessageInputStream(BytesMessage message) {
this.message = message;
}
/**
*
*/
@Override
public int read() throws IOException {
try {
@ -97,24 +89,19 @@ abstract class MessageBodyToBytesConverter {
}
}
/**
*
*/
@Override
public int read(byte[] buffer, int offset, int length) throws IOException {
try {
if (offset == 0)
if (offset == 0) {
return this.message.readBytes(buffer, length);
else
} else {
return super.read(buffer, offset, length);
}
} catch (JMSException e) {
throw new IOException(e.toString());
}
}
/**
*
*/
@Override
public int read(byte[] buffer) throws IOException {
try {
@ -125,22 +112,14 @@ abstract class MessageBodyToBytesConverter {
}
}
/**
*
*/
static class MessageConversionException extends RuntimeException {
private static final long serialVersionUID = -1464448549601643887L;
/**
*
*/
public MessageConversionException(String msg) {
super(msg);
}
/**
*
*/
public MessageConversionException(String msg, Throwable cause) {
super(msg, cause);
}

View File

@ -99,7 +99,7 @@ public class JMSPublisherConsumerIT {
* used. The may change to the point where all message types are supported
* at which point this test will no be longer required.
*/
@Test(expected = IllegalStateException.class)
@Test
public void validateFailOnUnsupportedMessageType() throws Exception {
final String destinationName = "validateFailOnUnsupportedMessageType";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);