This commit is contained in:
Mark Payne 2015-11-13 10:47:05 -05:00
commit 90f6830003
27 changed files with 1182 additions and 259 deletions

View File

@ -31,7 +31,12 @@ public interface FlowFile extends Comparable<FlowFile> {
/** /**
* @return the unique identifier for this flow file * @return the unique identifier for this flow file
* @deprecated This method has been deprecated in favor of using the attribute
* {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}.
* If an identifier is needed use {@link #getAttribute(String)} to retrieve the value for this attribute.
* For example, by calling getAttribute(CoreAttributes.UUID.getKey()).
*/ */
@Deprecated
long getId(); long getId();
/** /**

View File

@ -765,6 +765,13 @@ The following binary components are provided under the Apache Software License v
is a distributed tracing system that is Apache 2.0 Licensed. is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc. Copyright 2012 Twitter, Inc.
(ASLv2) Apache Qpid AMQP 1.0 Client
The following NOTICE information applies:
Copyright 2006-2015 The Apache Software Foundation
(ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
************************ ************************

View File

@ -242,6 +242,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-azure-nar</artifactId>
<version>0.4.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-processors</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,34 @@
nifi-azure-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Qpid AMQP 1.0 Client
The following NOTICE information applies:
Copyright 2006-2015 The Apache Software Foundation
(ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.eventhubs.client</groupId>
<artifactId>eventhubs-client</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.ConnectionStringBuilder;
import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubMessage;
import com.microsoft.eventhubs.client.IEventHubFilter;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" })
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
})
public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.sensitive(true)
.required(true)
.build();
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions")
.description("The number of partitions that the Event Hub has. Only this number of partitions will be used, "
+ "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("Event Hub Consumer Group")
.description("The name of the Event Hub Consumer Group to use when pulling events")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("$Default")
.required(true)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of FlowFiles to pull in a single JMS session")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("10")
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.")
.build();
private final ConcurrentMap<String, ResilientEventHubReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(EVENT_HUB_NAME);
properties.add(NAMESPACE);
properties.add(ACCESS_POLICY);
properties.add(POLICY_PRIMARY_KEY);
properties.add(NUM_PARTITIONS);
properties.add(CONSUMER_GROUP);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException {
ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) {
return existingReceiver;
}
// we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in
// having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition,
// we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no
// other thread is creating a client). If within the synchronized block, we still do not have an entry in the map,
// it is up to use to create the receiver, initialize it, and then put it into the map.
// We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the
// receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it
// to the map atomically. Hence, the synchronized block.
synchronized (this) {
existingReceiver = partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) {
return existingReceiver;
}
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString();
final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis());
final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter);
receiver.initialize();
partitionToReceiverMap.put(partitionId, receiver);
return receiver;
}
}
@OnStopped
public void tearDown() {
for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) {
receiver.close();
}
partitionToReceiverMap.clear();
}
@OnScheduled
public void setupPartitions(final ProcessContext context) {
final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
partitionNames.add(String.valueOf(i));
}
this.partitionNames = partitionNames;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final BlockingQueue<String> partitionIds = this.partitionNames;
final String partitionId = partitionIds.poll();
if (partitionId == null) {
getLogger().debug("No partitions available");
return;
}
final StopWatch stopWatch = new StopWatch(true);
try {
final ResilientEventHubReceiver receiver;
try {
receiver = getReceiver(context, partitionId);
} catch (final EventHubException e) {
throw new ProcessException(e);
}
final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L));
if (message == null) {
return;
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp()));
attributes.put("eventhub.offset", message.getOffset());
attributes.put("eventhub.sequence", String.valueOf(message.getSequence()));
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
attributes.put("eventhub.partition", partitionId);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(message.getData());
}
});
session.transfer(flowFile, REL_SUCCESS);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} finally {
partitionIds.offer(partitionId);
}
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.EventHubClient;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubSender;
@SupportsBatching
@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" })
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
public class PutAzureEventHub extends AbstractProcessor {
static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub "
+ "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost.");
static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent",
"This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. "
+ "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted.");
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to send to")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.sensitive(true)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.")
.build();
private volatile BlockingQueue<EventHubSender> senderQueue = new LinkedBlockingQueue<>();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(EVENT_HUB_NAME);
properties.add(NAMESPACE);
properties.add(ACCESS_POLICY);
properties.add(POLICY_PRIMARY_KEY);
return properties;
}
@OnScheduled
public final void setupClient(final ProcessContext context) throws EventHubException {
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName);
final int numThreads = context.getMaxConcurrentTasks();
senderQueue = new LinkedBlockingQueue<>(numThreads);
for (int i = 0; i < numThreads; i++) {
final EventHubSender sender = client.createPartitionSender(null);
senderQueue.offer(sender);
}
}
@OnStopped
public void tearDown() {
EventHubSender sender;
while ((sender = senderQueue.poll()) != null) {
sender.close();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final StopWatch stopWatch = new StopWatch(true);
final EventHubSender sender = senderQueue.poll();
try {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
try {
sender.send(buffer);
} catch (final EventHubException ehe) {
getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} finally {
senderQueue.offer(sender);
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-azure-processors</module>
<module>nifi-azure-nar</module>
</modules>
</project>

View File

@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket;
*/ */
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
.Builder().name("Document Type") .description("The type of contents.")
.description("The type of contents.") .required(true)
.required(true) .allowableValues(DocumentType.values())
.allowableValues(DocumentType.values()) .defaultValue(DocumentType.Json.toString())
.defaultValue(DocumentType.Json.toString()) .build();
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
.Builder().name("Document Id") .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
.description("A static, fixed Couchbase document id." .expressionLanguageSupported(true)
+ "Or an expression to construct the Couchbase document id.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .build();
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.") .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
.build(); .build();
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
.Builder().name("Couchbase Cluster Controller Service")
.description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
.required(true) .required(true)
.identifiesControllerService(CouchbaseClusterControllerService.class) .identifiesControllerService(CouchbaseClusterControllerService.class)
.build(); .build();
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
.Builder().name("Bucket Name")
.description("The name of bucket to access.") .description("The name of bucket to access.")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/** /**
* Add processor specific properties. * Add processor specific properties.
*
* @param descriptors add properties to this list * @param descriptors add properties to this list
*/ */
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/** /**
* Add processor specific relationships. * Add processor specific relationships.
*
* @param relationships add relationships to this list * @param relationships add relationships to this list
*/ */
protected void addSupportedRelationships(Set<Relationship> relationships) { protected void addSupportedRelationships(Set<Relationship> relationships) {
@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
} }
private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { private CouchbaseClusterControllerService getClusterService(final ProcessContext context) {
if(clusterService == null){ if (clusterService == null) {
synchronized(AbstractCouchbaseProcessor.class){ synchronized (AbstractCouchbaseProcessor.class) {
if(clusterService == null){ if (clusterService == null) {
clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
.asControllerService(CouchbaseClusterControllerService.class); .asControllerService(CouchbaseClusterControllerService.class);
} }
} }
} }
@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/** /**
* Open a bucket connection using a CouchbaseClusterControllerService. * Open a bucket connection using a CouchbaseClusterControllerService.
*
* @param context a process context * @param context a process context
* @return a bucket instance * @return a bucket instance
*/ */
@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/** /**
* Generate a transit url. * Generate a transit url.
*
* @param context a process context * @param context a process context
* @return a transit url based on the bucket name and the CouchbaseClusterControllerService name * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
*/ */
protected String getTransitUrl(final ProcessContext context) { protected String getTransitUrl(final ProcessContext context, final String docId) {
return new StringBuilder(context.getProperty(BUCKET_NAME).getValue()) return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
.append('@')
.append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
.toString();
} }
/** /**
* Handles the thrown CocuhbaseException accordingly. * Handles the thrown CouchbaseException accordingly.
*
* @param context a process context * @param context a process context
* @param session a process session * @param session a process session
* @param logger a logger * @param logger a logger
@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
* @param errMsg a message to be logged * @param errMsg a message to be logged
*/ */
protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session, protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
final ProcessorLog logger, FlowFile inFile, CouchbaseException e, final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
String errMsg) { String errMsg) {
logger.error(errMsg, e); logger.error(errMsg, e);
if(inFile != null){ if (inFile != null) {
ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
switch(strategy.penalty()) { switch (strategy.penalty()) {
case Penalize: case Penalize:
if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile}); if (logger.isDebugEnabled()) {
inFile = session.penalize(inFile); logger.debug("Penalized: {}", new Object[] {inFile});
break; }
case Yield: inFile = session.penalize(inFile);
if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile}); break;
context.yield(); case Yield:
break; if (logger.isDebugEnabled()) {
case None: logger.debug("Yielded context: {}", new Object[] {inFile});
break; }
context.yield();
break;
case None:
break;
} }
switch(strategy.result()) { switch (strategy.result()) {
case ProcessException: case ProcessException:
throw new ProcessException(errMsg, e); throw new ProcessException(errMsg, e);
case Failure: case Failure:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_FAILURE); session.transfer(inFile, REL_FAILURE);
break; break;
case Retry: case Retry:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_RETRY); session.transfer(inFile, REL_RETRY);
break; break;
} }
} }
} }

View File

@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings {
mapping.put(ReplicaNotConfiguredException.class, ConfigurationError); mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
// when a particular Service(KV, View, Query, DCP) isn't running in a cluster // when a particular Service(KV, View, Query, DCP) isn't running in a cluster
mapping.put(ServiceNotAvailableException.class, ConfigurationError); mapping.put(ServiceNotAvailableException.class, ConfigurationError);
// SSL configuration error, such as key store mis configuration. // SSL configuration error, such as key store misconfiguration.
mapping.put(SSLException.class, ConfigurationError); mapping.put(SSLException.class, ConfigurationError);
/* /*

View File

@ -16,18 +16,19 @@
*/ */
package org.apache.nifi.processors.couchbase; package org.apache.nifi.processors.couchbase;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.core.CouchbaseException;
@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument; import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.DocumentDoesNotExistException; import com.couchbase.client.java.error.DocumentDoesNotExistException;
@Tags({ "nosql", "couchbase", "database", "get" }) @Tags({"nosql", "couchbase", "database", "get"})
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer") @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
+ "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire "
+ "FlowFile will be buffered in memory.")
@SeeAlso({CouchbaseClusterControllerService.class}) @SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
@WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
}) })
public class GetCouchbaseKey extends AbstractCouchbaseProcessor { public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override @Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE); descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID); descriptors.add(DOC_ID);
} }
@Override @Override
protected void addSupportedRelationships(Set<Relationship> relationships) { protected void addSupportedRelationships(final Set<Relationship> relationships) {
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL); relationships.add(REL_ORIGINAL);
relationships.add(REL_RETRY); relationships.add(REL_RETRY);
@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile inFile = session.get(); FlowFile inFile = session.get();
if (inFile == null) {
return;
}
final long startNanos = System.nanoTime();
final ProcessorLog logger = getLogger();
String docId = null; String docId = null;
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
} else { } else {
final byte[] content = new byte[(int) inFile.getSize()]; final byte[] content = new byte[(int) inFile.getSize()];
@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
docId = new String(content, StandardCharsets.UTF_8); docId = new String(content, StandardCharsets.UTF_8);
} }
if(StringUtils.isEmpty(docId)){ if (StringUtils.isEmpty(docId)) {
if(inFile != null){ throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
} }
try { try {
Document<?> doc = null; final Document<?> doc;
byte[] content = null; final byte[] content;
Bucket bucket = openBucket(context); final Bucket bucket = openBucket(context);
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
case Json : { switch (documentType) {
case Json: {
RawJsonDocument document = bucket.get(docId, RawJsonDocument.class); RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
if(document != null){ if (document == null) {
doc = null;
content = null;
} else {
content = document.content().getBytes(StandardCharsets.UTF_8); content = document.content().getBytes(StandardCharsets.UTF_8);
doc = document; doc = document;
} }
break; break;
} }
case Binary : { case Binary: {
BinaryDocument document = bucket.get(docId, BinaryDocument.class); BinaryDocument document = bucket.get(docId, BinaryDocument.class);
if(document != null){ if (document == null) {
doc = null;
content = null;
} else {
content = document.content().array(); content = document.content().array();
doc = document; doc = document;
} }
break; break;
} }
default: {
doc = null;
content = null;
}
} }
if(doc == null) { if (doc == null) {
logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
if(inFile != null){ inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); session.transfer(inFile, REL_FAILURE);
session.transfer(inFile, REL_FAILURE);
}
return; return;
} }
if(inFile != null){ FlowFile outFile = session.create(inFile);
session.transfer(inFile, REL_ORIGINAL); outFile = session.write(outFile, new OutputStreamCallback() {
} @Override
public void process(final OutputStream out) throws IOException {
out.write(content);
}
});
FlowFile outFile = session.create(); final Map<String, String> updatedAttrs = new HashMap<>();
outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
outFile = session.putAllAttributes(outFile, updatedAttrs); outFile = session.putAllAttributes(outFile, updatedAttrs);
session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
session.transfer(outFile, REL_SUCCESS);
} catch (CouchbaseException e){ final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
session.transfer(outFile, REL_SUCCESS);
session.transfer(inFile, REL_ORIGINAL);
} catch (final CouchbaseException e) {
String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
handleCouchbaseException(context, session, logger, inFile, e, errMsg); handleCouchbaseException(context, session, logger, inFile, e, errMsg);
} }
} }

View File

@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument; import com.couchbase.client.java.document.RawJsonDocument;
@Tags({ "nosql", "couchbase", "database", "put" }) @Tags({"nosql", "couchbase", "database", "put"})
@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class}) @SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({ @ReadsAttributes({
@ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"), @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") })
})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
@WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
}) })
public class PutCouchbaseKey extends AbstractCouchbaseProcessor { public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
.Builder().name("Persist To") .description("Durability constraint about disk persistence.")
.description("Durability constraint about disk persistence.") .required(true)
.required(true) .allowableValues(PersistTo.values())
.allowableValues(PersistTo.values()) .defaultValue(PersistTo.NONE.toString())
.defaultValue(PersistTo.NONE.toString()) .build();
.build();
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
.Builder().name("Replicate To") .description("Durability constraint about replication.")
.description("Durability constraint about replication.") .required(true)
.required(true) .allowableValues(ReplicateTo.values())
.allowableValues(ReplicateTo.values()) .defaultValue(ReplicateTo.NONE.toString())
.defaultValue(ReplicateTo.NONE.toString()) .build();
.build();
@Override @Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
if ( flowFile == null ) { if (flowFile == null) {
return; return;
} }
@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
} }
}); });
String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
} }
try { try {
Document<?> doc = null; Document<?> doc = null;
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){ switch (documentType) {
case Json : { case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8)); doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break; break;
} }
case Binary : { case Binary: {
ByteBuf buf = Unpooled.copiedBuffer(content); final ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf); doc = BinaryDocument.create(docId, buf);
break; break;
} }
} }
PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo); doc = openBucket(context).upsert(doc, persistTo, replicateTo);
Map<String, String> updatedAttrs = new HashMap<>();
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (CouchbaseException e) { flowFile = session.putAllAttributes(flowFile, updatedAttrs);
String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
session.transfer(flowFile, REL_SUCCESS);
} catch (final CouchbaseException e) {
String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg); handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
} }
} }

View File

@ -100,10 +100,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(BUCKET_NAME, bucketName); testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId); testRunner.setProperty(DOC_ID, docId);
testRunner.enqueue(new byte[0]);
testRunner.run(); testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0); testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -145,7 +146,7 @@ public class TestGetCouchbaseKey {
} }
@Test @Test
public void testDocIdExpWithNullFlowFile() throws Exception { public void testDocIdExpWithEmptyFlowFile() throws Exception {
String docIdExp = "doc-s"; String docIdExp = "doc-s";
String docId = "doc-s"; String docId = "doc-s";
@ -157,10 +158,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(DOC_ID, docIdExp); testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
testRunner.run(); testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 0); testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0); testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -179,11 +181,12 @@ public class TestGetCouchbaseKey {
setupMockBucket(bucket); setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp); testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
try { try {
testRunner.run(); testRunner.run();
fail("Exception should be thrown."); fail("Exception should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
} }
@ -210,7 +213,7 @@ public class TestGetCouchbaseKey {
try { try {
testRunner.run(); testRunner.run();
fail("Exception should be thrown."); fail("Exception should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
} }
@ -288,7 +291,7 @@ public class TestGetCouchbaseKey {
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be thrown."); fail("ProcessException should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
} }
@ -315,7 +318,7 @@ public class TestGetCouchbaseKey {
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be thrown."); fail("ProcessException should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
} }

View File

@ -32,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -80,19 +78,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
private PriorityQueue<FlowFileRecord> activeQueue = null; private PriorityQueue<FlowFileRecord> activeQueue = null;
// guarded by lock
private ArrayList<FlowFileRecord> swapQueue = null; private ArrayList<FlowFileRecord> swapQueue = null;
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
private boolean swapMode = false; private boolean swapMode = false;
private volatile String maximumQueueDataSize;
private volatile long maximumQueueByteCount; private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
private volatile long maximumQueueObjectCount; private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final AtomicLong flowFileExpirationMillis;
private final Connection connection; private final Connection connection;
private final AtomicReference<String> flowFileExpirationPeriod;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final List<FlowFilePrioritizer> priorities; private final List<FlowFilePrioritizer> priorities;
private final int swapThreshold; private final int swapThreshold;
@ -106,8 +104,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ProvenanceEventRepository provRepository; private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager; private final ResourceClaimManager resourceClaimManager;
private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler; private final ProcessScheduler scheduler;
@ -115,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>(); priorities = new ArrayList<>();
maximumQueueObjectCount = 0L;
maximumQueueDataSize = "0 MB";
maximumQueueByteCount = 0L;
flowFileExpirationMillis = new AtomicLong(0);
flowFileExpirationPeriod = new AtomicReference<>("0 mins");
swapQueue = new ArrayList<>(); swapQueue = new ArrayList<>();
this.eventReporter = eventReporter; this.eventReporter = eventReporter;
this.swapManager = swapManager; this.swapManager = swapManager;
@ -161,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
@Override @Override
public void setBackPressureObjectThreshold(final long maxQueueSize) { public void setBackPressureObjectThreshold(final long threshold) {
writeLock.lock(); boolean updated = false;
try { while (!updated) {
maximumQueueObjectCount = maxQueueSize; MaxQueueSize maxSize = maxQueueSize.get();
this.queueFullRef.set(determineIfFull()); final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
} finally { updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
writeLock.unlock("setBackPressureObjectThreshold");
} }
} }
@Override @Override
public long getBackPressureObjectThreshold() { public long getBackPressureObjectThreshold() {
return maximumQueueObjectCount; return maxQueueSize.get().getMaxCount();
} }
@Override @Override
public void setBackPressureDataSizeThreshold(final String maxDataSize) { public void setBackPressureDataSizeThreshold(final String maxDataSize) {
writeLock.lock(); final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
try {
maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); boolean updated = false;
maximumQueueDataSize = maxDataSize; while (!updated) {
this.queueFullRef.set(determineIfFull()); MaxQueueSize maxSize = maxQueueSize.get();
} finally { final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
writeLock.unlock("setBackPressureDataSizeThreshold"); updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
} }
} }
@Override @Override
public String getBackPressureDataSizeThreshold() { public String getBackPressureDataSizeThreshold() {
return maximumQueueDataSize; return maxQueueSize.get().getMaxSize();
} }
@Override @Override
@ -220,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public void acknowledge(final FlowFileRecord flowFile) { public void acknowledge(final FlowFileRecord flowFile) {
if (queueFullRef.get()) { incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
}
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// queue was full but no longer is. Notify that the source may now be available to run, // queue was full but no longer is. Notify that the source may now be available to run,
@ -246,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
totalSize += flowFile.getSize(); totalSize += flowFile.getSize();
} }
if (queueFullRef.get()) { incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// it's possible that queue was full but no longer is. Notify that the source may now be available to run, // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@ -267,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public boolean isFull() { public boolean isFull() {
return queueFullRef.get(); final MaxQueueSize maxSize = maxQueueSize.get();
}
/** // Check if max size is set
* MUST be called with either the read or write lock held if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
*
* @return true if full
*/
private boolean determineIfFull() {
final long maxSize = maximumQueueObjectCount;
final long maxBytes = maximumQueueByteCount;
if (maxSize <= 0 && maxBytes <= 0) {
return false; return false;
} }
final QueueSize queueSize = getQueueSize(); final QueueSize queueSize = getQueueSize();
if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true; return true;
} }
if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
return true; return true;
} }
return false; return false;
} }
@Override @Override
public void put(final FlowFileRecord file) { public void put(final FlowFileRecord file) {
writeLock.lock(); writeLock.lock();
@ -307,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(1, file.getSize()); incrementActiveQueueSize(1, file.getSize());
activeQueue.add(file); activeQueue.add(file);
} }
queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("put(FlowFileRecord)"); writeLock.unlock("put(FlowFileRecord)");
} }
@ -337,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(numFiles, bytes); incrementActiveQueueSize(numFiles, bytes);
activeQueue.addAll(files); activeQueue.addAll(files);
} }
queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("putAll"); writeLock.unlock("putAll");
} }
@ -374,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
FlowFileRecord flowFile = null; FlowFileRecord flowFile = null;
// First check if we have any records Pre-Fetched. // First check if we have any records Pre-Fetched.
final long expirationMillis = flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
writeLock.lock(); writeLock.lock();
try { try {
flowFile = doPoll(expiredRecords, expirationMillis); flowFile = doPoll(expiredRecords, expirationMillis);
@ -393,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired; boolean isExpired;
migrateSwapToActive(); migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
long expiredBytes = 0L; long expiredBytes = 0L;
do { do {
flowFile = this.activeQueue.poll(); flowFile = this.activeQueue.poll();
@ -424,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
} }
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
return flowFile; return flowFile;
} }
@ -451,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) { private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
migrateSwapToActive(); migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
long expiredBytes = 0L; long expiredBytes = 0L;
@ -462,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
} }
/** /**
@ -660,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
long drainedSize = 0L; long drainedSize = 0L;
FlowFileRecord pulled = null; FlowFileRecord pulled = null;
final long expirationMillis = this.flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
expiredRecords.add(pulled); expiredRecords.add(pulled);
@ -688,8 +629,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try { try {
migrateSwapToActive(); migrateSwapToActive();
final long expirationMillis = this.flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
final boolean queueFullAtStart = queueFullRef.get();
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>();
@ -734,17 +674,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
this.activeQueue.addAll(unselected); this.activeQueue.addAll(unselected);
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
return selectedFlowFiles; return selectedFlowFiles;
} finally { } finally {
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
writeLock.unlock("poll(Filter, Set)"); writeLock.unlock("poll(Filter, Set)");
} }
} }
@ -817,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public String getFlowFileExpiration() { public String getFlowFileExpiration() {
return flowFileExpirationPeriod.get(); return expirationPeriod.get().getPeriod();
} }
@Override @Override
public int getFlowFileExpiration(final TimeUnit timeUnit) { public int getFlowFileExpiration(final TimeUnit timeUnit) {
return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
} }
@Override @Override
@ -831,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (millis < 0) { if (millis < 0) {
throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
} }
this.flowFileExpirationPeriod.set(flowExpirationPeriod);
this.flowFileExpirationMillis.set(millis); expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
} }
@ -1287,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
" Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
} }
} }
private static class MaxQueueSize {
private final String maxSize;
private final long maxBytes;
private final long maxCount;
public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
this.maxSize = maxSize;
this.maxBytes = maxBytes;
this.maxCount = maxCount;
}
public String getMaxSize() {
return maxSize;
}
public long getMaxBytes() {
return maxBytes;
}
public long getMaxCount() {
return maxCount;
}
@Override
public String toString() {
return maxCount + " Objects/" + maxSize;
}
}
private static class TimePeriod {
private final String period;
private final long millis;
public TimePeriod(final String period, final long millis) {
this.period = period;
this.millis = millis;
}
public String getPeriod() {
return period;
}
public long getMillis() {
return millis;
}
@Override
public String toString() {
return period;
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.nifi.controller; package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -48,18 +49,28 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestStandardFlowFileQueue { public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null; private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null; private StandardFlowFileQueue queue = null;
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
@Before @Before
@SuppressWarnings("unchecked")
public void setup() { public void setup() {
provRecords.clear();
final Connection connection = Mockito.mock(Connection.class); final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@ -72,6 +83,16 @@ public class TestStandardFlowFileQueue {
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
for (final ProvenanceEventRecord record : iterable) {
provRecords.add(record);
}
return null;
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L); TestFlowFile.idGenerator.set(0L);
@ -106,6 +127,160 @@ public class TestStandardFlowFileQueue {
assertEquals(0L, unackSize.getByteCount()); assertEquals(0L, unackSize.getByteCount());
} }
@Test
public void testBackPressure() {
queue.setBackPressureObjectThreshold(10);
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertFalse(queue.isFull());
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNotNull(polled);
assertTrue(expiredRecords.isEmpty());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
// queue is still full because FlowFile has not yet been acknowledged.
assertTrue(queue.isFull());
queue.acknowledge(polled);
// FlowFile has been acknowledged; queue should no longer be full.
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollFilter() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final FlowFileFilter filter = new FlowFileFilter() {
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
};
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test(timeout = 10000)
public void testBackPressureAfterDrop() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final String requestId = UUID.randomUUID().toString();
final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
while (status.getState() != DropFlowFileState.COMPLETE) {
Thread.sleep(10L);
}
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertEquals(10, provRecords.size());
for (final ProvenanceEventRecord event : provRecords) {
assertNotNull(event);
assertEquals(ProvenanceEventType.DROP, event.getEventType());
}
}
@Test
public void testBackPressureAfterPollSingle() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNull(polled);
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollMultiple() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(10, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test @Test
public void testSwapOutOccurs() { public void testSwapOutOccurs() {
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {

View File

@ -133,7 +133,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
.description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();

View File

@ -83,6 +83,9 @@ public class FetchHDFS extends AbstractHadoopProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HADOOP_CONFIGURATION_RESOURCES); properties.add(HADOOP_CONFIGURATION_RESOURCES);
properties.add(FILENAME); properties.add(FILENAME);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(KERBEROS_RELOGIN_PERIOD);
return properties; return properties;
} }

View File

@ -138,6 +138,9 @@ public class ListHDFS extends AbstractHadoopProcessor {
properties.add(DISTRIBUTED_CACHE_SERVICE); properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(DIRECTORY); properties.add(DIRECTORY);
properties.add(RECURSE_SUBDIRS); properties.add(RECURSE_SUBDIRS);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(KERBEROS_RELOGIN_PERIOD);
return properties; return properties;
} }

View File

@ -168,8 +168,12 @@ public class ExecuteSQL extends AbstractProcessor {
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(outgoing, REL_SUCCESS); session.transfer(outgoing, REL_SUCCESS);
} catch (final ProcessException | SQLException e) { } catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e }); if (incoming == null) {
session.transfer(incoming, REL_FAILURE); logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e});
} else {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e});
session.transfer(incoming, REL_FAILURE);
}
} }
} }
} }

View File

@ -114,6 +114,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
"\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " + "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
"with a format of \"MMM d HH:mm:ss\".") "with a format of \"MMM d HH:mm:ss\".")
.required(true) .required(true)
.defaultValue("${now():format('MMM d HH:mm:ss')}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
public class TestDynamicEnvironment {
public static void main(String[] args) {
// iterate through current environment and print out all properties starting with NIFI
for (Map.Entry<String, String> env: System.getenv().entrySet()) {
if (env.getKey().startsWith("NIFI")) {
System.out.println(env.getKey() + "=" + env.getValue());
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
public class TestIngestAndUpdate {
public static void main(String[] args) throws IOException {
byte[] bytes = new byte[1024];
System.out.write(System.getProperty("user.dir").getBytes());
System.out.println(":ModifiedResult");
int numRead = 0;
while ((numRead = System.in.read(bytes)) != -1) {
System.out.write(bytes, 0, numRead);
}
System.out.flush();
System.out.close();
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TestSuccess {
public static void main(String[] args) {
System.out.println("Test was a success");
}
}

View File

@ -42,12 +42,13 @@
<module>nifi-language-translation-bundle</module> <module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module> <module>nifi-mongodb-bundle</module>
<module>nifi-flume-bundle</module> <module>nifi-flume-bundle</module>
<module>nifi-hbase-bundle</module> <module>nifi-hbase-bundle</module>
<module>nifi-ambari-bundle</module> <module>nifi-ambari-bundle</module>
<module>nifi-image-bundle</module> <module>nifi-image-bundle</module>
<module>nifi-avro-bundle</module> <module>nifi-avro-bundle</module>
<module>nifi-couchbase-bundle</module> <module>nifi-couchbase-bundle</module>
</modules> <module>nifi-azure-bundle</module>
</modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@ -931,6 +931,12 @@
<version>0.4.0-SNAPSHOT</version> <version>0.4.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-nar</artifactId>
<version>0.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>