From 56ad22aea6db7d401a72857b1356dfba85f274b6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 11 Nov 2015 23:06:04 -0500 Subject: [PATCH 01/10] NIFI-900: Created Processors for interacting with Microsoft Azure EventHubs Reviewed (with amendments needed for clean merge, whitespace and NOTICEs) by Tony Kurc (tkurc@apache.org) --- nifi-assembly/NOTICE | 7 + nifi-assembly/pom.xml | 5 + .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 37 +++ .../src/main/resources/META-INF/NOTICE | 34 +++ .../nifi-azure-processors/pom.xml | 60 +++++ .../azure/eventhub/GetAzureEventHub.java | 254 ++++++++++++++++++ .../azure/eventhub/PutAzureEventHub.java | 186 +++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ nifi-nar-bundles/nifi-azure-bundle/pom.xml | 35 +++ nifi-nar-bundles/pom.xml | 7 +- pom.xml | 6 + 11 files changed, 644 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-azure-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 02c9d7f661..5e180351cf 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -765,6 +765,13 @@ The following binary components are provided under the Apache Software License v is a distributed tracing system that is Apache 2.0 Licensed. Copyright 2012 Twitter, Inc. + (ASLv2) Apache Qpid AMQP 1.0 Client + The following NOTICE information applies: + Copyright 2006-2015 The Apache Software Foundation + + (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/) + + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 8e4a175c93..961349f1ff 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -242,6 +242,11 @@ language governing permissions and limitations under the License. --> nifi-hbase_1_1_2-client-service-nar nar + + org.apache.nifi + nifi-azure-nar + nar + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml new file mode 100644 index 0000000000..26d79d20c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + + + nifi-azure-nar + 0.4.0-SNAPSHOT + nar + + + + org.apache.nifi + nifi-azure-processors + 0.4.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..1d5a375977 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,34 @@ +nifi-azure-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Qpid AMQP 1.0 Client + The following NOTICE information applies: + Copyright 2006-2015 The Apache Software Foundation + + (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/) + + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml new file mode 100644 index 0000000000..c34161c820 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + + + nifi-azure-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + javax.jms + javax.jms-api + + + com.microsoft.eventhubs.client + eventhubs-client + 0.9.1 + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java new file mode 100644 index 0000000000..2c62df61bb --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubMessage; +import com.microsoft.eventhubs.client.IEventHubFilter; +import com.microsoft.eventhubs.client.ResilientEventHubReceiver; + +@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" }) +@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile") +@WritesAttributes({ + @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), + @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), + @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), + @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") +}) +public class GetAzureEventHub extends AbstractProcessor { + + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("Event Hub Name") + .description("The name of the Azure Event Hub to pull messages from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); + + static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() + .name("Number of Event Hub Partitions") + .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, " + + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() + .name("Event Hub Consumer Group") + .description("The name of the Event Hub Consumer Group to use when pulling events") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("$Default") + .required(true) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of FlowFiles to pull in a single JMS session") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("10") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.") + .build(); + + private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); + private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(EVENT_HUB_NAME); + properties.add(NAMESPACE); + properties.add(ACCESS_POLICY); + properties.add(POLICY_PRIMARY_KEY); + properties.add(NUM_PARTITIONS); + properties.add(CONSUMER_GROUP); + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException { + ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId); + if (existingReceiver != null) { + return existingReceiver; + } + + // we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in + // having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition, + // we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no + // other thread is creating a client). If within the synchronized block, we still do not have an entry in the map, + // it is up to use to create the receiver, initialize it, and then put it into the map. + // We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the + // receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it + // to the map atomically. Hence, the synchronized block. + synchronized (this) { + existingReceiver = partitionToReceiverMap.get(partitionId); + if (existingReceiver != null) { + return existingReceiver; + } + + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue(); + + final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString(); + final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis()); + final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter); + receiver.initialize(); + + partitionToReceiverMap.put(partitionId, receiver); + return receiver; + } + } + + + @OnStopped + public void tearDown() { + for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) { + receiver.close(); + } + + partitionToReceiverMap.clear(); + } + + @OnScheduled + public void setupPartitions(final ProcessContext context) { + final BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { + partitionNames.add(String.valueOf(i)); + } + this.partitionNames = partitionNames; + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final BlockingQueue partitionIds = this.partitionNames; + final String partitionId = partitionIds.poll(); + if (partitionId == null) { + getLogger().debug("No partitions available"); + return; + } + + final StopWatch stopWatch = new StopWatch(true); + try { + final ResilientEventHubReceiver receiver; + try { + receiver = getReceiver(context, partitionId); + } catch (final EventHubException e) { + throw new ProcessException(e); + } + + final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L)); + if (message == null) { + return; + } + + final Map attributes = new HashMap<>(); + attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp())); + attributes.put("eventhub.offset", message.getOffset()); + attributes.put("eventhub.sequence", String.valueOf(message.getSequence())); + attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); + attributes.put("eventhub.partition", partitionId); + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(message.getData()); + } + }); + + session.transfer(flowFile, REL_SUCCESS); + + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); + final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } finally { + partitionIds.offer(partitionId); + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java new file mode 100644 index 0000000000..82fff2752f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubSender; + + +@SupportsBatching +@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" }) +@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, " + + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.") +public class PutAzureEventHub extends AbstractProcessor { + + static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub " + + "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost."); + + static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent", + "This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. " + + "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted."); + + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("Event Hub Name") + .description("The name of the Azure Event Hub to send to") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); + + private volatile BlockingQueue senderQueue = new LinkedBlockingQueue<>(); + + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(EVENT_HUB_NAME); + properties.add(NAMESPACE); + properties.add(ACCESS_POLICY); + properties.add(POLICY_PRIMARY_KEY); + return properties; + } + + + @OnScheduled + public final void setupClient(final ProcessContext context) throws EventHubException { + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + + final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName); + + final int numThreads = context.getMaxConcurrentTasks(); + senderQueue = new LinkedBlockingQueue<>(numThreads); + for (int i = 0; i < numThreads; i++) { + final EventHubSender sender = client.createPartitionSender(null); + senderQueue.offer(sender); + } + } + + @OnStopped + public void tearDown() { + EventHubSender sender; + while ((sender = senderQueue.poll()) != null) { + sender.close(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final StopWatch stopWatch = new StopWatch(true); + final EventHubSender sender = senderQueue.poll(); + try { + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + try { + sender.send(buffer); + } catch (final EventHubException ehe) { + getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } finally { + senderQueue.offer(sender); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..178e52c984 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.azure.eventhub.PutAzureEventHub +org.apache.nifi.processors.azure.eventhub.GetAzureEventHub \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml new file mode 100644 index 0000000000..3978748773 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 0.4.0-SNAPSHOT + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + pom + + + nifi-azure-processors + nifi-azure-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 510935dca8..4c0925f957 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -42,12 +42,13 @@ nifi-language-translation-bundle nifi-mongodb-bundle nifi-flume-bundle - nifi-hbase-bundle + nifi-hbase-bundle nifi-ambari-bundle nifi-image-bundle nifi-avro-bundle nifi-couchbase-bundle - + nifi-azure-bundle + @@ -132,4 +133,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index cf6f018cda..8a8cdb09a7 100644 --- a/pom.xml +++ b/pom.xml @@ -931,6 +931,12 @@ 0.4.0-SNAPSHOT nar + + org.apache.nifi + nifi-azure-nar + 0.4.0-SNAPSHOT + nar + org.apache.nifi nifi-properties From 02102ea1c2add14df874744633f2f6dcd83d3525 Mon Sep 17 00:00:00 2001 From: joewitt Date: Thu, 12 Nov 2015 18:00:40 -0500 Subject: [PATCH 02/10] Revert "NIFI-1134". This was removed because they looked like floating classes. But in reality they are used to make jars which we use to test the build. However, we really need the test to work differently but until that is fixed these must stay. This reverts commit 41f3875347a6ff7c87e35665019c35e99e78eecb. --- .../src/test/java/TestDynamicEnvironment.java | 29 ++++++++++++++++ .../src/test/java/TestIngestAndUpdate.java | 34 +++++++++++++++++++ .../src/test/java/TestSuccess.java | 24 +++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java new file mode 100644 index 0000000000..3e6cad2fc9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +public class TestDynamicEnvironment { + public static void main(String[] args) { + // iterate through current environment and print out all properties starting with NIFI + for (Map.Entry env: System.getenv().entrySet()) { + if (env.getKey().startsWith("NIFI")) { + System.out.println(env.getKey() + "=" + env.getValue()); + } + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java new file mode 100644 index 0000000000..c9ed9f9631 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +public class TestIngestAndUpdate { + + public static void main(String[] args) throws IOException { + byte[] bytes = new byte[1024]; + System.out.write(System.getProperty("user.dir").getBytes()); + System.out.println(":ModifiedResult"); + int numRead = 0; + while ((numRead = System.in.read(bytes)) != -1) { + System.out.write(bytes, 0, numRead); + } + System.out.flush(); + System.out.close(); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java new file mode 100644 index 0000000000..3c74d54896 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestSuccess { + + public static void main(String[] args) { + System.out.println("Test was a success"); + } + +} From 65bd8c0b1f0a32a43e4f3276bc8e606c12913bd2 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2015 21:01:50 -0500 Subject: [PATCH 03/10] NIFI-1109 Updated documentation and cleaned up code Reviewed by Bryan Bende (bbende@apache.org). Committed with amendments (for whitespace, a couple misspellings and to change a test method name based on review) by Tony Kurc (tkurc@apache.org) --- .../couchbase/AbstractCouchbaseProcessor.java | 107 ++++++++-------- .../couchbase/CouchbaseExceptionMappings.java | 2 +- .../processors/couchbase/GetCouchbaseKey.java | 114 ++++++++++-------- .../processors/couchbase/PutCouchbaseKey.java | 78 ++++++------ .../couchbase/TestGetCouchbaseKey.java | 17 +-- 5 files changed, 168 insertions(+), 150 deletions(-) diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java index b8790417dc..158caa1d67 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket; */ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { - public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor - .Builder().name("Document Type") - .description("The type of contents.") - .required(true) - .allowableValues(DocumentType.values()) - .defaultValue(DocumentType.Json.toString()) - .build(); + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); - public static final PropertyDescriptor DOC_ID = new PropertyDescriptor - .Builder().name("Document Id") - .description("A static, fixed Couchbase document id." - + "Or an expression to construct the Couchbase document id.") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id") + .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.") .build(); - public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor - .Builder().name("Couchbase Cluster Controller Service") + public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service") .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") .required(true) .identifiesControllerService(CouchbaseClusterControllerService.class) .build(); - public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor - .Builder().name("Bucket Name") + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name") .description("The name of bucket to access.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Add processor specific properties. + * * @param descriptors add properties to this list */ protected void addSupportedProperties(List descriptors) { @@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Add processor specific relationships. + * * @param relationships add relationships to this list */ protected void addSupportedRelationships(Set relationships) { @@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { } private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { - if(clusterService == null){ - synchronized(AbstractCouchbaseProcessor.class){ - if(clusterService == null){ + if (clusterService == null) { + synchronized (AbstractCouchbaseProcessor.class) { + if (clusterService == null) { clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) - .asControllerService(CouchbaseClusterControllerService.class); + .asControllerService(CouchbaseClusterControllerService.class); } } } @@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Open a bucket connection using a CouchbaseClusterControllerService. + * * @param context a process context * @return a bucket instance */ @@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Generate a transit url. + * * @param context a process context * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name */ - protected String getTransitUrl(final ProcessContext context) { - return new StringBuilder(context.getProperty(BUCKET_NAME).getValue()) - .append('@') - .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()) - .toString(); + protected String getTransitUrl(final ProcessContext context, final String docId) { + return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId; } /** - * Handles the thrown CocuhbaseException accordingly. + * Handles the thrown CouchbaseException accordingly. + * * @param context a process context * @param session a process session * @param logger a logger @@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { * @param errMsg a message to be logged */ protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session, - final ProcessorLog logger, FlowFile inFile, CouchbaseException e, - String errMsg) { + final ProcessorLog logger, FlowFile inFile, CouchbaseException e, + String errMsg) { logger.error(errMsg, e); - if(inFile != null){ + if (inFile != null) { ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); - switch(strategy.penalty()) { - case Penalize: - if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile}); - inFile = session.penalize(inFile); - break; - case Yield: - if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile}); - context.yield(); - break; - case None: - break; + switch (strategy.penalty()) { + case Penalize: + if (logger.isDebugEnabled()) { + logger.debug("Penalized: {}", new Object[] {inFile}); + } + inFile = session.penalize(inFile); + break; + case Yield: + if (logger.isDebugEnabled()) { + logger.debug("Yielded context: {}", new Object[] {inFile}); + } + context.yield(); + break; + case None: + break; } - switch(strategy.result()) { - case ProcessException: - throw new ProcessException(errMsg, e); - case Failure: - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); - session.transfer(inFile, REL_FAILURE); - break; - case Retry: - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); - session.transfer(inFile, REL_RETRY); - break; + switch (strategy.result()) { + case ProcessException: + throw new ProcessException(errMsg, e); + case Failure: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_FAILURE); + break; + case Retry: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_RETRY); + break; } } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java index 87ffabb953..e4faba3a2c 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java @@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings { mapping.put(ReplicaNotConfiguredException.class, ConfigurationError); // when a particular Service(KV, View, Query, DCP) isn't running in a cluster mapping.put(ServiceNotAvailableException.class, ConfigurationError); - // SSL configuration error, such as key store mis configuration. + // SSL configuration error, such as key store misconfiguration. mapping.put(SSLException.class, ConfigurationError); /* diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java index 4aa96777cc..b4ff467bdd 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -16,18 +16,19 @@ */ package org.apache.nifi.processors.couchbase; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.ReadsAttribute; -import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; import com.couchbase.client.core.CouchbaseException; @@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.RawJsonDocument; import com.couchbase.client.java.error.DocumentDoesNotExistException; -@Tags({ "nosql", "couchbase", "database", "get" }) -@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer") +@Tags({"nosql", "couchbase", "database", "get"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the property. " + + "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire " + + "FlowFile will be buffered in memory.") @SeeAlso({CouchbaseClusterControllerService.class}) -@ReadsAttributes({ - @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") - }) @WritesAttributes({ - @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), - @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), - @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), - @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), - @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") - }) + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) public class GetCouchbaseKey extends AbstractCouchbaseProcessor { @Override - protected void addSupportedProperties(List descriptors) { + protected void addSupportedProperties(final List descriptors) { descriptors.add(DOCUMENT_TYPE); descriptors.add(DOC_ID); } @Override - protected void addSupportedRelationships(Set relationships) { + protected void addSupportedRelationships(final Set relationships) { relationships.add(REL_SUCCESS); relationships.add(REL_ORIGINAL); relationships.add(REL_RETRY); @@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final ProcessorLog logger = getLogger(); FlowFile inFile = session.get(); + if (inFile == null) { + return; + } + final long startNanos = System.nanoTime(); + final ProcessorLog logger = getLogger(); String docId = null; - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) { docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); } else { final byte[] content = new byte[(int) inFile.getSize()]; @@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { docId = new String(content, StandardCharsets.UTF_8); } - if(StringUtils.isEmpty(docId)){ - if(inFile != null){ - throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); - } + if (StringUtils.isEmpty(docId)) { + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); } try { - Document doc = null; - byte[] content = null; - Bucket bucket = openBucket(context); - DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); - switch (documentType){ - case Json : { + final Document doc; + final byte[] content; + final Bucket bucket = openBucket(context); + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + + switch (documentType) { + case Json: { RawJsonDocument document = bucket.get(docId, RawJsonDocument.class); - if(document != null){ + if (document == null) { + doc = null; + content = null; + } else { content = document.content().getBytes(StandardCharsets.UTF_8); doc = document; } break; } - case Binary : { + case Binary: { BinaryDocument document = bucket.get(docId, BinaryDocument.class); - if(document != null){ + if (document == null) { + doc = null; + content = null; + } else { content = document.content().array(); doc = document; } break; } + default: { + doc = null; + content = null; + } } - if(doc == null) { - logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); - if(inFile != null){ - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); - session.transfer(inFile, REL_FAILURE); - } + if (doc == null) { + logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile}); + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); + session.transfer(inFile, REL_FAILURE); return; } - if(inFile != null){ - session.transfer(inFile, REL_ORIGINAL); - } + FlowFile outFile = session.create(inFile); + outFile = session.write(outFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(content); + } + }); - FlowFile outFile = session.create(); - outFile = session.importFrom(new ByteArrayInputStream(content), outFile); - Map updatedAttrs = new HashMap<>(); + final Map updatedAttrs = new HashMap<>(); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); outFile = session.putAllAttributes(outFile, updatedAttrs); - session.getProvenanceReporter().receive(outFile, getTransitUrl(context)); - session.transfer(outFile, REL_SUCCESS); - } catch (CouchbaseException e){ - String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); + final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis); + session.transfer(outFile, REL_SUCCESS); + session.transfer(inFile, REL_ORIGINAL); + } catch (final CouchbaseException e) { + String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e); handleCouchbaseException(context, session, logger, inFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java index 2aa803c0eb..291c02cb7e 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument; import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.RawJsonDocument; -@Tags({ "nosql", "couchbase", "database", "put" }) +@Tags({"nosql", "couchbase", "database", "put"}) @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") @SeeAlso({CouchbaseClusterControllerService.class}) @ReadsAttributes({ @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") - }) +}) @WritesAttributes({ - @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), - @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), - @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), - @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), - @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") - }) + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) public class PutCouchbaseKey extends AbstractCouchbaseProcessor { - public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor - .Builder().name("Persist To") - .description("Durability constraint about disk persistence.") - .required(true) - .allowableValues(PersistTo.values()) - .defaultValue(PersistTo.NONE.toString()) - .build(); + public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To") + .description("Durability constraint about disk persistence.") + .required(true) + .allowableValues(PersistTo.values()) + .defaultValue(PersistTo.NONE.toString()) + .build(); - public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor - .Builder().name("Replicate To") - .description("Durability constraint about replication.") - .required(true) - .allowableValues(ReplicateTo.values()) - .defaultValue(ReplicateTo.NONE.toString()) - .build(); + public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To") + .description("Durability constraint about replication.") + .required(true) + .allowableValues(ReplicateTo.values()) + .defaultValue(ReplicateTo.NONE.toString()) + .build(); @Override protected void addSupportedProperties(List descriptors) { @@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ProcessorLog logger = getLogger(); FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } @@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { } }); - String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + String docId = flowFile.getAttribute(CoreAttributes.UUID.key()); + if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) { docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); } try { Document doc = null; - DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); - switch (documentType){ - case Json : { + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType) { + case Json: { doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8)); break; } - case Binary : { - ByteBuf buf = Unpooled.copiedBuffer(content); + case Binary: { + final ByteBuf buf = Unpooled.copiedBuffer(content); doc = BinaryDocument.create(docId, buf); break; } } - PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); - ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); + final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); + final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); doc = openBucket(context).upsert(doc, persistTo, replicateTo); - Map updatedAttrs = new HashMap<>(); + + final Map updatedAttrs = new HashMap<>(); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); - flowFile = session.putAllAttributes(flowFile, updatedAttrs); - session.getProvenanceReporter().send(flowFile, getTransitUrl(context)); - session.transfer(flowFile, REL_SUCCESS); - } catch (CouchbaseException e) { - String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); + flowFile = session.putAllAttributes(flowFile, updatedAttrs); + session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final CouchbaseException e) { + String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); handleCouchbaseException(context, session, logger, flowFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java index 108980c35a..37768261a7 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -100,10 +100,11 @@ public class TestGetCouchbaseKey { testRunner.setProperty(BUCKET_NAME, bucketName); testRunner.setProperty(DOC_ID, docId); + testRunner.enqueue(new byte[0]); testRunner.run(); - testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); @@ -145,7 +146,7 @@ public class TestGetCouchbaseKey { } @Test - public void testDocIdExpWithNullFlowFile() throws Exception { + public void testDocIdExpWithEmptyFlowFile() throws Exception { String docIdExp = "doc-s"; String docId = "doc-s"; @@ -157,10 +158,11 @@ public class TestGetCouchbaseKey { testRunner.setProperty(DOC_ID, docIdExp); + testRunner.enqueue(new byte[0]); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 1); testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); @@ -179,11 +181,12 @@ public class TestGetCouchbaseKey { setupMockBucket(bucket); testRunner.setProperty(DOC_ID, docIdExp); + testRunner.enqueue(new byte[0]); try { testRunner.run(); fail("Exception should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } @@ -210,7 +213,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("Exception should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } @@ -288,7 +291,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("ProcessException should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); } @@ -315,7 +318,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("ProcessException should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); } From db7b94b8046898517597e570af4530d7e1aa6a28 Mon Sep 17 00:00:00 2001 From: Tony Kurc Date: Thu, 12 Nov 2015 20:12:54 -0500 Subject: [PATCH 04/10] NIFI-696 Deprecated org.apache.nifi.flowfile.FlowFile.getId() --- .../src/main/java/org/apache/nifi/flowfile/FlowFile.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index 973ea40a07..1288d212fb 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -31,7 +31,12 @@ public interface FlowFile extends Comparable { /** * @return the unique identifier for this flow file + * @deprecated This method has been deprecated in favor of using the attribute + * {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}. + * If an identifier is needed use {@link #getAttribute(String)} to retrieve the value for this attribute. + * For example, by calling getAttribute(CoreAttributes.UUID.getKey()). */ + @Deprecated long getId(); /** From 33ef59c5bad3715d2a4625d88f018e7adc09a7b4 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 12 Nov 2015 22:14:05 -0500 Subject: [PATCH 05/10] NIFI-1127 Adding Kerberos properties to FetchHDFS and ListHDFS. Reviewed by Tony Kurc (tkurc@apache.org) --- .../apache/nifi/processors/hadoop/AbstractHadoopProcessor.java | 2 +- .../main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java | 3 +++ .../main/java/org/apache/nifi/processors/hadoop/ListHDFS.java | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index a67a9fd0fe..9e89c3a1e6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -133,7 +133,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) + public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index c27ade9a7a..6434e5eb02 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -83,6 +83,9 @@ public class FetchHDFS extends AbstractHadoopProcessor { final List properties = new ArrayList<>(); properties.add(HADOOP_CONFIGURATION_RESOURCES); properties.add(FILENAME); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); return properties; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 563bda8b6d..0fae4ca1e9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -138,6 +138,9 @@ public class ListHDFS extends AbstractHadoopProcessor { properties.add(DISTRIBUTED_CACHE_SERVICE); properties.add(DIRECTORY); properties.add(RECURSE_SUBDIRS); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); return properties; } From 0900fb80c9deaf96d17c72683b9b4886b44d6180 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2015 22:41:57 -0500 Subject: [PATCH 06/10] NIFI-1153: If no incoming FlowFile, don't try to transfer null Reviewed by Bryan Bende (bbende@apache.org) Amended based on review (change to a log message) by Tony Kurc (tkurc@apache.org) --- .../org/apache/nifi/processors/standard/ExecuteSQL.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index eed6eb2f11..452df4298b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -168,8 +168,12 @@ public class ExecuteSQL extends AbstractProcessor { session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(outgoing, REL_SUCCESS); } catch (final ProcessException | SQLException e) { - logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e }); - session.transfer(incoming, REL_FAILURE); + if (incoming == null) { + logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e}); + } else { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e}); + session.transfer(incoming, REL_FAILURE); + } } } } From e6086420aa8af51f4fbba83810b67fc88f0f938d Mon Sep 17 00:00:00 2001 From: Tony Kurc Date: Thu, 12 Nov 2015 23:11:43 -0500 Subject: [PATCH 07/10] NIFI-1158 Default timestamp based on now --- .../main/java/org/apache/nifi/processors/standard/PutSyslog.java | 1 + 1 file changed, 1 insertion(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index b7e7a9cd4d..27f293859e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -114,6 +114,7 @@ public class PutSyslog extends AbstractSyslogProcessor { "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " + "with a format of \"MMM d HH:mm:ss\".") .required(true) + .defaultValue("${now():format('MMM d HH:mm:ss')}") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); From 3ed0949c5578f379cab7b90ff32778bf6296404a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2015 07:48:57 -0500 Subject: [PATCH 08/10] NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly update the indicator for whether or not queue is full Signed-off-by: joewitt --- .../controller/StandardFlowFileQueue.java | 41 ++++-- .../controller/TestStandardFlowFileQueue.java | 123 ++++++++++++++++++ 2 files changed, 150 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index ae991c806e..dd74250274 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -80,19 +80,30 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private PriorityQueue activeQueue = null; + + // guarded by lock private ArrayList swapQueue = null; private final AtomicReference size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); private boolean swapMode = false; + + // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it. private volatile String maximumQueueDataSize; private volatile long maximumQueueByteCount; private volatile long maximumQueueObjectCount; - private final EventReporter eventReporter; + // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it. private final AtomicLong flowFileExpirationMillis; - private final Connection connection; private final AtomicReference flowFileExpirationPeriod; + + // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max + private final AtomicBoolean queueFullRef = new AtomicBoolean(false); + + // TODO: Unit test better! + + private final EventReporter eventReporter; + private final Connection connection; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private final List priorities; private final int swapThreshold; @@ -106,8 +117,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -683,13 +692,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public List poll(final FlowFileFilter filter, final Set expiredRecords) { long bytesPulled = 0L; int flowFilesPulled = 0; + boolean queueFullAtStart = false; writeLock.lock(); try { migrateSwapToActive(); final long expirationMillis = this.flowFileExpirationMillis.get(); - final boolean queueFullAtStart = queueFullRef.get(); + queueFullAtStart = queueFullRef.get(); final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); @@ -735,17 +745,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.activeQueue.addAll(unselected); - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - return selectedFlowFiles; } finally { - incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); - writeLock.unlock("poll(Filter, Set)"); + try { + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); + + // if at least 1 FlowFile was expired & the queue was full before we started, then + // we need to determine whether or not the queue is full again. If no FlowFile was expired, + // then the queue will still be full until the appropriate #acknowledge method is called. + if (queueFullAtStart && !expiredRecords.isEmpty()) { + queueFullRef.set(determineIfFull()); + } + } finally { + writeLock.unlock("poll(Filter, Set)"); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 8b8c678672..61f96fd7a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -48,6 +49,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; @@ -106,6 +108,127 @@ public class TestStandardFlowFileQueue { assertEquals(0L, unackSize.getByteCount()); } + @Test + public void testBackPressure() { + queue.setBackPressureObjectThreshold(10); + + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertFalse(queue.isFull()); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNotNull(polled); + assertTrue(expiredRecords.isEmpty()); + + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + // queue is still full because FlowFile has not yet been acknowledged. + assertTrue(queue.isFull()); + queue.acknowledge(polled); + + // FlowFile has been acknowledged; queue should no longer be full. + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollFilter() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + + final FlowFileFilter filter = new FlowFileFilter() { + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + }; + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(filter, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollSingle() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNull(polled); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollMultiple() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(10, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + @Test public void testSwapOutOccurs() { for (int i = 0; i < 10000; i++) { From 37d6b7350e964e850685498d2232f2688f1a5afc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2015 08:36:38 -0500 Subject: [PATCH 09/10] NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer needed Signed-off-by: joewitt --- .../controller/StandardFlowFileQueue.java | 201 ++++++++---------- .../controller/TestStandardFlowFileQueue.java | 52 +++++ 2 files changed, 139 insertions(+), 114 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index dd74250274..5dce801970 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -32,8 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private boolean swapMode = false; - // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it. - private volatile String maximumQueueDataSize; - private volatile long maximumQueueByteCount; - private volatile long maximumQueueObjectCount; - - // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it. - private final AtomicLong flowFileExpirationMillis; - private final AtomicReference flowFileExpirationPeriod; - - // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - - // TODO: Unit test better! + private final AtomicReference maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L)); + private final AtomicReference expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L)); private final EventReporter eventReporter; private final Connection connection; @@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList())); priorities = new ArrayList<>(); - maximumQueueObjectCount = 0L; - maximumQueueDataSize = "0 MB"; - maximumQueueByteCount = 0L; - flowFileExpirationMillis = new AtomicLong(0); - flowFileExpirationPeriod = new AtomicReference<>("0 mins"); swapQueue = new ArrayList<>(); this.eventReporter = eventReporter; this.swapManager = swapManager; @@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public void setBackPressureObjectThreshold(final long maxQueueSize) { - writeLock.lock(); - try { - maximumQueueObjectCount = maxQueueSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureObjectThreshold"); + public void setBackPressureObjectThreshold(final long threshold) { + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public long getBackPressureObjectThreshold() { - return maximumQueueObjectCount; + return maxQueueSize.get().getMaxCount(); } @Override public void setBackPressureDataSizeThreshold(final String maxDataSize) { - writeLock.lock(); - try { - maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); - maximumQueueDataSize = maxDataSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureDataSizeThreshold"); + final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount()); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public String getBackPressureDataSizeThreshold() { - return maximumQueueDataSize; + return maxQueueSize.get().getMaxSize(); } @Override @@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public void acknowledge(final FlowFileRecord flowFile) { - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - } + incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // queue was full but no longer is. Notify that the source may now be available to run, @@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { totalSize += flowFile.getSize(); } - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - } + incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // it's possible that queue was full but no longer is. Notify that the source may now be available to run, @@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { - return queueFullRef.get(); - } + final MaxQueueSize maxSize = maxQueueSize.get(); - /** - * MUST be called with either the read or write lock held - * - * @return true if full - */ - private boolean determineIfFull() { - final long maxSize = maximumQueueObjectCount; - final long maxBytes = maximumQueueByteCount; - if (maxSize <= 0 && maxBytes <= 0) { + // Check if max size is set + if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { return false; } final QueueSize queueSize = getQueueSize(); - if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { + if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } - if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { + if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) { return true; } return false; } + @Override public void put(final FlowFileRecord file) { writeLock.lock(); @@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(1, file.getSize()); activeQueue.add(file); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("put(FlowFileRecord)"); } @@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(numFiles, bytes); activeQueue.addAll(files); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("putAll"); } @@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { FlowFileRecord flowFile = null; // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); writeLock.lock(); try { flowFile = doPoll(expiredRecords, expirationMillis); @@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean isExpired; migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); long expiredBytes = 0L; - do { flowFile = this.activeQueue.poll(); @@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); } - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - return flowFile; } @@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private void doPoll(final List records, int maxResults, final Set expiredRecords) { migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); - final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); long expiredBytes = 0L; @@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } } /** @@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { long drainedSize = 0L; FlowFileRecord pulled = null; - final long expirationMillis = this.flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { expiredRecords.add(pulled); @@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public List poll(final FlowFileFilter filter, final Set expiredRecords) { long bytesPulled = 0L; int flowFilesPulled = 0; - boolean queueFullAtStart = false; writeLock.lock(); try { migrateSwapToActive(); - final long expirationMillis = this.flowFileExpirationMillis.get(); - queueFullAtStart = queueFullRef.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); @@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } this.activeQueue.addAll(unselected); + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); return selectedFlowFiles; } finally { - try { - incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - } finally { - writeLock.unlock("poll(Filter, Set)"); - } + writeLock.unlock("poll(Filter, Set)"); } } @@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public String getFlowFileExpiration() { - return flowFileExpirationPeriod.get(); + return expirationPeriod.get().getPeriod(); } @Override public int getFlowFileExpiration(final TimeUnit timeUnit) { - return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); + return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS); } @Override @@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (millis < 0) { throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); } - this.flowFileExpirationPeriod.set(flowExpirationPeriod); - this.flowFileExpirationMillis.set(millis); + + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); } @@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue { " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; } } + + + private static class MaxQueueSize { + private final String maxSize; + private final long maxBytes; + private final long maxCount; + + public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) { + this.maxSize = maxSize; + this.maxBytes = maxBytes; + this.maxCount = maxCount; + } + + public String getMaxSize() { + return maxSize; + } + + public long getMaxBytes() { + return maxBytes; + } + + public long getMaxCount() { + return maxCount; + } + + @Override + public String toString() { + return maxCount + " Objects/" + maxSize; + } + } + + private static class TimePeriod { + private final String period; + private final long millis; + + public TimePeriod(final String period, final long millis) { + this.period = period; + this.millis = millis; + } + + public String getPeriod() { + return period; + } + + public long getMillis() { + return millis; + } + + @Override + public String toString() { + return period; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 61f96fd7a3..09ac7f2a27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; + private List provRecords = new ArrayList<>(); + @Before + @SuppressWarnings("unchecked") public void setup() { + provRecords.clear(); + final Connection connection = Mockito.mock(Connection.class); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); @@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue { final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final Iterable iterable = (Iterable) invocation.getArguments()[0]; + for (final ProvenanceEventRecord record : iterable) { + provRecords.add(record); + } + return null; + } + }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); @@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue { assertTrue(queue.isActiveQueueEmpty()); } + @Test(timeout = 10000) + public void testBackPressureAfterDrop() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final String requestId = UUID.randomUUID().toString(); + final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test"); + + while (status.getState() != DropFlowFileState.COMPLETE) { + Thread.sleep(10L); + } + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + + assertEquals(10, provRecords.size()); + for (final ProvenanceEventRecord event : provRecords) { + assertNotNull(event); + assertEquals(ProvenanceEventType.DROP, event.getEventType()); + } + } + @Test public void testBackPressureAfterPollSingle() throws InterruptedException { queue.setBackPressureObjectThreshold(10); From 36d00a60f51d123d1109afedc7c49875a940f7d2 Mon Sep 17 00:00:00 2001 From: joewitt Date: Thu, 12 Nov 2015 23:01:56 -0500 Subject: [PATCH 10/10] NIFI-1155 fixed contrib-check violation --- .../java/org/apache/nifi/controller/StandardFlowFileQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 5dce801970..9a439c9829 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -764,7 +764,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (millis < 0) { throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); } - + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); }