mirror of https://github.com/apache/nifi.git
NIFI-4007 - Update azure EventHubs client to latest version
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #1880.
This commit is contained in:
parent
bdfd710692
commit
82ef671953
|
@ -31,13 +31,8 @@
|
|||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-eventhubs</artifactId>
|
||||
<version>0.9.0</version>
|
||||
<version>0.14.2</version>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>com.microsoft.eventhubs.client</groupId>
|
||||
<artifactId>eventhubs-client</artifactId>
|
||||
<version>0.9.1</version>
|
||||
</dependency>-->
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-storage</artifactId>
|
||||
|
|
|
@ -338,12 +338,12 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
FlowFile flowFile = session.create();
|
||||
EventData.SystemProperties systemProperties = eventData.getSystemProperties();
|
||||
final EventData.SystemProperties systemProperties = eventData.getSystemProperties();
|
||||
|
||||
if (null != systemProperties) {
|
||||
attributes.put("eventhub.enqueued.timestamp", String.valueOf(eventData.getSystemProperties().getEnqueuedTime()));
|
||||
attributes.put("eventhub.offset", eventData.getSystemProperties().getOffset());
|
||||
attributes.put("eventhub.sequence", String.valueOf(eventData.getSystemProperties().getSequenceNumber()));
|
||||
attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
|
||||
attributes.put("eventhub.offset", systemProperties.getOffset());
|
||||
attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
|
||||
}
|
||||
|
||||
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
|
||||
|
@ -352,10 +352,9 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.write(flowFile, out -> {
|
||||
out.write(eventData.getBody());
|
||||
out.write(eventData.getBytes());
|
||||
});
|
||||
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final String namespace = context.getProperty(NAMESPACE).getValue();
|
||||
|
|
Loading…
Reference in New Issue