mirror of https://github.com/apache/nifi.git
NIFI-1833 - Addressed issues from PR review.
NIFI-1833 Moved AbstractListProcessor.java, EntityListing.java, and ListableEntity.java from nifi-standard-processors into nifi-processor-utils Moved TestAbstractListProcessor.java into nifi-processor-utils Set nifi-azure-nar's nar dependency back to nifi-standard-services-api-nar Fixed failing integration tests (ITFetchAzureBlobStorage.java, ITListAzureBlobStorage.java, and ITPutAzureStorageBlob.java) and refactored them to be able to run in parallel NIFI-1833 Moved security notice info in the additional details documentation into the descriptions of the specific attributes for which those notices are intended Added displayName usage to properties Updated exception handling in FetchAzureBlobStorage.java and PutAzureBlobStorage.java to cause flowfiles with Output/InputStreamCallback failures to be routed to the processor's failure relationship Cleaned up dependencies in pom NIFI-1833 Removed unnecessary calls to map on Optional in the onTrigger exception handling of FetchAzureBlobStorage.java and PutAzureBlobStorage.java NIFI-1833 Updates due to nifi-processor-utils being moved under nifi-nar-bundles This closes #1719. Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
parent
f30c8169ab
commit
26d90fbccf
|
@ -38,7 +38,7 @@
|
|||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-nar</artifactId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
|
|
@ -19,9 +19,6 @@
|
|||
</parent>
|
||||
<artifactId>nifi-azure-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<properties>
|
||||
<powermock.version>1.6.5</powermock.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -31,25 +28,11 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-eventhubs</artifactId>
|
||||
<version>0.9.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>2.8.6</version>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>com.microsoft.eventhubs.client</groupId>
|
||||
<artifactId>eventhubs-client</artifactId>
|
||||
|
@ -65,27 +48,19 @@
|
|||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.powermock</groupId>
|
||||
<artifactId>powermock-module-junit4</artifactId>
|
||||
<version>${powermock.version}</version>
|
||||
<scope>test</scope>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-processors</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -25,8 +25,8 @@ import java.util.List;
|
|||
|
||||
public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
|
||||
|
||||
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
|
||||
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The filename of the blob")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections
|
||||
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
|
||||
|
|
|
@ -34,7 +34,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
|||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
|
||||
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
|
||||
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
|
||||
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
||||
|
@ -67,11 +67,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
|||
*/
|
||||
private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
|
||||
CloudStorageAccount storageAccount;
|
||||
try {
|
||||
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||
} catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) {
|
||||
throw e;
|
||||
}
|
||||
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||
return storageAccount;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,14 +23,24 @@ public final class AzureConstants {
|
|||
public static final String BLOCK = "Block";
|
||||
public static final String PAGE = "Page";
|
||||
|
||||
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
|
||||
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key")
|
||||
.description("The storage account key. There are certain risks in allowing the account key to be stored as a flowfile" +
|
||||
"attribute. While it does provide for a more flexible flow by allowing the account key to " +
|
||||
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
|
||||
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
|
||||
"In addition, the provenance repositories may be put on encrypted disk partitions.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
|
||||
|
||||
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name")
|
||||
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name")
|
||||
.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" +
|
||||
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
|
||||
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
|
||||
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
|
||||
"In addition, the provenance repositories may be put on encrypted disk partitions.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
|
||||
|
||||
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
|
||||
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container name")
|
||||
.description("Name of the azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
|
||||
|
||||
// use HTTPS by default as per MSFT recommendation
|
||||
public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
|
||||
|
|
|
@ -23,11 +23,13 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
|
@ -75,6 +77,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
AtomicReference<Exception> storedException = new AtomicReference<>();
|
||||
try {
|
||||
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
|
||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
|
@ -89,6 +92,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
try {
|
||||
blob.download(os);
|
||||
} catch (StorageException e) {
|
||||
storedException.set(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
});
|
||||
|
@ -103,10 +107,15 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
|
||||
|
||||
} catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
|
||||
if (e instanceof ProcessException && storedException.get() == null) {
|
||||
throw (ProcessException) e;
|
||||
} else {
|
||||
Exception failureException = Optional.ofNullable(storedException.get()).orElse(e);
|
||||
getLogger().error("Failure to fetch Azure blob {}", new Object[]{blobPath}, failureException);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,10 +40,10 @@ import org.apache.nifi.components.PropertyDescriptor;
|
|||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
|
||||
import org.apache.nifi.processors.standard.AbstractListProcessor;
|
||||
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
@ -59,7 +59,9 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
|
|||
@TriggerSerially
|
||||
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
|
||||
@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
|
||||
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
|
||||
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage. " +
|
||||
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
|
||||
"previous node left off without duplicating all of the data.")
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
|
||||
@WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
|
||||
|
@ -71,12 +73,14 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
|
|||
@WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
|
||||
@WritesAttribute(attribute = "lang", description = "Language code for the content"),
|
||||
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
|
||||
@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
|
||||
+ "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
|
||||
@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " +
|
||||
"This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " +
|
||||
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
|
||||
"where the previous node left off, without duplicating the data.")
|
||||
public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
||||
|
||||
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true).required(false).build();
|
||||
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
|
||||
|
||||
|
@ -155,7 +159,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
return listing;
|
||||
}
|
||||
|
||||
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
|
||||
private CloudStorageAccount createStorageConnection(ProcessContext context) {
|
||||
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
||||
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
|
||||
final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
|
||||
|
|
|
@ -22,7 +22,9 @@ import java.io.InputStream;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -69,6 +71,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
|
||||
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
AtomicReference<Exception> storedException = new AtomicReference<>();
|
||||
try {
|
||||
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
|
||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
|
@ -94,6 +97,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
attributes.put("azure.length", String.valueOf(length));
|
||||
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
|
||||
} catch (StorageException | URISyntaxException e) {
|
||||
storedException.set(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
});
|
||||
|
@ -106,10 +110,15 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
|
||||
|
||||
} catch (IllegalArgumentException | URISyntaxException | StorageException e) {
|
||||
getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
|
||||
if (e instanceof ProcessException && storedException.get() == null) {
|
||||
throw (ProcessException) e;
|
||||
} else {
|
||||
Exception failureException = Optional.ofNullable(storedException.get()).orElse(e);
|
||||
getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, failureException);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.nifi.processors.standard.util.ListableEntity;
|
||||
import org.apache.nifi.processor.util.list.ListableEntity;
|
||||
|
||||
public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEntity {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title>FetchAzureBlobStorage Processor</title>
|
||||
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
|
||||
<h2>Apache NiFi Azure Processors</h2>
|
||||
|
||||
<h3>Important Security Note</h3>
|
||||
<p>
|
||||
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||
</p>
|
||||
<p>
|
||||
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -1,39 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title>ListAzureBlobStorage Processor</title>
|
||||
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
|
||||
<h2>Apache NiFi Azure Processors</h2>
|
||||
|
||||
<h3>Important Security Note</h3>
|
||||
<p>
|
||||
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||
</p>
|
||||
<p>
|
||||
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -1,39 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title>PutAzureBlobStorage Processor</title>
|
||||
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
|
||||
<h2>Apache NiFi Azure Processors</h2>
|
||||
|
||||
<h3>Important Security Note</h3>
|
||||
<p>
|
||||
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||
</p>
|
||||
<p>
|
||||
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -16,17 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -35,15 +25,20 @@ import java.net.URISyntaxException;
|
|||
import java.security.InvalidKeyException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
|
||||
public abstract class AbstractAzureIT {
|
||||
protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
|
||||
public static final String TEST_CONTAINER_NAME = "nifitest";
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
||||
class AzureTestUtil {
|
||||
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
|
||||
static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
|
||||
|
||||
private static final Properties CONFIG;
|
||||
protected static final String TEST_BLOB_NAME = "testing";
|
||||
protected static final String TEST_TABLE_NAME = "testing";
|
||||
static final String TEST_BLOB_NAME = "testing";
|
||||
|
||||
static {
|
||||
final FileInputStream fis;
|
||||
|
@ -63,35 +58,19 @@ public abstract class AbstractAzureIT {
|
|||
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException {
|
||||
CloudBlobContainer container = getContainer();
|
||||
container.createIfNotExists();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
|
||||
CloudBlobContainer container = getContainer();
|
||||
for (ListBlobItem blob : container.listBlobs()) {
|
||||
if (blob instanceof CloudBlob) {
|
||||
((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static String getAccountName() {
|
||||
static String getAccountName() {
|
||||
return CONFIG.getProperty("accountName");
|
||||
}
|
||||
|
||||
public static String getAccountKey() {
|
||||
static String getAccountKey() {
|
||||
return CONFIG.getProperty("accountKey");
|
||||
}
|
||||
|
||||
protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
|
||||
static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
|
||||
String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey());
|
||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
return blobClient.getContainerReference(TEST_CONTAINER_NAME);
|
||||
return blobClient.getContainerReference(containerName);
|
||||
}
|
||||
|
||||
}
|
|
@ -16,12 +16,15 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.AbstractAzureProcessor;
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
|
@ -31,32 +34,47 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
||||
public class ITFetchAzureBlobStorage extends AbstractAzureIT {
|
||||
public class ITFetchAzureBlobStorage {
|
||||
|
||||
@Test
|
||||
public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
|
||||
String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
|
||||
CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
|
||||
container.createIfNotExists();
|
||||
|
||||
CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
|
||||
byte[] buf = "0123456789".getBytes();
|
||||
InputStream in = new ByteArrayInputStream(buf);
|
||||
blob.upload(in, 10);
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
|
||||
|
||||
runner.setValidateExpressionUsage(true);
|
||||
try {
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, containerName);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobname", TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobtype", AzureConstants.BLOCK);
|
||||
runner.enqueue(new byte[0], attributes);
|
||||
runner.run();
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobtype", AzureConstants.BLOCK);
|
||||
runner.enqueue(new byte[0], attributes);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
|
||||
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
||||
flowFile.assertContentEquals("0123456789".getBytes());
|
||||
flowFile.assertAttributeEquals("azure.length", "10");
|
||||
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
|
||||
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
||||
flowFile.assertContentEquals("0123456789".getBytes());
|
||||
flowFile.assertAttributeEquals("azure.length", "10");
|
||||
}
|
||||
} finally {
|
||||
container.deleteIfExists();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,55 +21,50 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
||||
public class ITListAzureBlobStorage extends AbstractAzureIT {
|
||||
public class ITListAzureBlobStorage {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
|
||||
CloudBlobContainer container = getContainer();
|
||||
@Test
|
||||
public void testListsAzureBlobStorageContent() throws InvalidKeyException, StorageException, URISyntaxException, IOException {
|
||||
String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
|
||||
CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
|
||||
container.createIfNotExists();
|
||||
|
||||
CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
|
||||
CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
|
||||
byte[] buf = "0123456789".getBytes();
|
||||
InputStream in = new ByteArrayInputStream(buf);
|
||||
blob.upload(in, 10);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
|
||||
CloudBlobContainer container = getContainer();
|
||||
container.deleteIfExists();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListsAzureBlobStorageContent() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
|
||||
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
|
||||
try {
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, containerName);
|
||||
|
||||
// requires multiple runs to deal with List processor checking
|
||||
runner.run(3);
|
||||
// requires multiple runs to deal with List processor checking
|
||||
runner.run(3);
|
||||
|
||||
runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
|
||||
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
|
||||
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
|
||||
|
||||
for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
|
||||
entry.assertAttributeEquals("azure.length", "10");
|
||||
entry.assertAttributeEquals("mime.type", "application/octet-stream");
|
||||
for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
|
||||
entry.assertAttributeEquals("azure.length", "10");
|
||||
entry.assertAttributeEquals("mime.type", "application/octet-stream");
|
||||
}
|
||||
} finally {
|
||||
container.deleteIfExists();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.AzureConstants;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
|
@ -25,27 +28,38 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ITPutAzureStorageBlob extends AbstractAzureIT {
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
||||
public class ITPutAzureStorageBlob {
|
||||
|
||||
@Test
|
||||
public void testPuttingBlob() throws IOException {
|
||||
public void testPuttingBlob() throws IOException, InvalidKeyException, StorageException, URISyntaxException {
|
||||
String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
|
||||
CloudBlobContainer container = AzureTestUtil.getContainer(containerName);
|
||||
container.createIfNotExists();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
|
||||
|
||||
runner.setValidateExpressionUsage(true);
|
||||
try {
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
|
||||
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureConstants.CONTAINER, containerName);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
|
||||
|
||||
runner.enqueue("0123456789".getBytes());
|
||||
runner.run();
|
||||
runner.enqueue("0123456789".getBytes());
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
|
||||
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
||||
flowFile.assertContentEquals("0123456789".getBytes());
|
||||
flowFile.assertAttributeEquals("azure.length", "10");
|
||||
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
|
||||
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
||||
flowFile.assertContentEquals("0123456789".getBytes());
|
||||
flowFile.assertAttributeEquals("azure.length", "10");
|
||||
}
|
||||
} finally {
|
||||
container.deleteIfExists();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,15 @@
|
|||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
package org.apache.nifi.processor.util.list;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -52,8 +52,6 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.standard.util.EntityListing;
|
||||
import org.apache.nifi.processors.standard.util.ListableEntity;
|
||||
import org.codehaus.jackson.JsonNode;
|
||||
import org.codehaus.jackson.JsonParseException;
|
||||
import org.codehaus.jackson.map.JsonMappingException;
|
||||
|
@ -156,11 +154,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
* files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
|
||||
* near instantaneously after the prior iteration effectively voiding the built in buffer
|
||||
*/
|
||||
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
|
||||
public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
|
||||
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
|
||||
static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
|
||||
|
||||
protected File getPersistenceFile() {
|
||||
public File getPersistenceFile() {
|
||||
return new File("conf/state/" + getIdentifier());
|
||||
}
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
package org.apache.nifi.processor.util.list;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
package org.apache.nifi.processor.util.list;
|
||||
|
||||
public interface ListableEntity {
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
package org.apache.nifi.processor.util.list;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -41,7 +41,8 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
|
|||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.standard.util.ListableEntity;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processor.util.list.ListableEntity;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -474,7 +475,7 @@ public class TestAbstractListProcessor {
|
|||
public File persistenceFile = new File(persistenceFolder + persistenceFilename);
|
||||
|
||||
@Override
|
||||
protected File getPersistenceFile() {
|
||||
public File getPersistenceFile() {
|
||||
return persistenceFile;
|
||||
}
|
||||
|
|
@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||
|
||||
import java.io.File;
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
|||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard.util;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.nifi.processor.util.list.ListableEntity;
|
||||
|
||||
public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
|||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -1007,6 +1007,11 @@
|
|||
<version>1.2.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-nar</artifactId>
|
||||
|
|
Loading…
Reference in New Issue