From 94945a6fd5e88c083f1f3e32d3b3d5d5954686d1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 24 Apr 2015 20:13:21 -0400 Subject: [PATCH] NIFI-533: Initial implementation of FetchHDFS and ListHDFS --- .../OnPrimaryNodeStateChange.java | 44 ++ .../notification/PrimaryNodeState.java | 33 ++ .../nifi/controller/FlowController.java | 16 +- .../apache/nifi/web/util/SnippetUtils.java | 2 +- .../nifi-hdfs-processors/pom.xml | 8 +- .../hadoop/AbstractHadoopProcessor.java | 27 + .../nifi/processors/hadoop/FetchHDFS.java | 126 +++++ .../nifi/processors/hadoop/GetHDFS.java | 36 +- .../nifi/processors/hadoop/ListHDFS.java | 466 ++++++++++++++++++ .../processors/hadoop/util/HDFSListing.java | 83 ++++ .../processors/hadoop/util/LongSerDe.java | 48 ++ .../processors/hadoop/util/StringSerDe.java | 44 ++ .../org.apache.nifi.processor.Processor | 6 +- .../standard/TestDetectDuplicate.java | 5 + .../client/DistributedMapCacheClient.java | 14 + .../DistributedMapCacheClientService.java | 22 + .../cache/server/map/MapCache.java | 1 + .../cache/server/map/MapCacheServer.java | 7 + .../cache/server/map/PersistentMapCache.java | 24 + .../cache/server/map/SimpleMapCache.java | 22 + 20 files changed, 996 insertions(+), 38 deletions(-) create mode 100644 nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java create mode 100644 nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java create mode 100644 nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java create mode 100644 nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java create mode 100644 nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java create mode 100644 nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java create mode 100644 nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java new file mode 100644 index 0000000000..e0736602d3 --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.notification; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + *

+ * Marker annotation that a component can use to indicate that a method should be + * called whenever the state of the Primary Node in a cluster has changed. + *

+ * + *

+ * Methods with this annotation should take either no arguments or one argument of type + * {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context about what changed + * so that the component can take appropriate action. + *

+ */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnPrimaryNodeStateChange { + +} diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java new file mode 100644 index 0000000000..3a7245c39a --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.notification; + +/** + * Represents a state change that occurred for the Primary Node of a NiFi cluster. + */ +public enum PrimaryNodeState { + /** + * The node receiving this state has been elected the Primary Node of the NiFi cluster. + */ + ELECTED_PRIMARY_NODE, + + /** + * The node receiving this state was the Primary Node but has now had its Primary Node + * role revoked. + */ + PRIMARY_NODE_REVOKED; +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ec25ab1c98..ef9fe77878 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -53,6 +53,8 @@ import org.apache.nifi.admin.service.UserService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.DataFlow; @@ -74,8 +76,8 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; import org.apache.nifi.connectable.StandardConnection; import org.apache.nifi.controller.exception.CommunicationsException; -import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; @@ -3098,6 +3100,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'"); + final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; + final ProcessGroup rootGroup = getGroup(getRootGroupId()); + for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); + } + for (final ControllerServiceNode serviceNode : getAllControllerServices()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); + } + for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); + } + // update primary this.primary = primary; eventDrivenWorkerQueue.setPrimary(primary); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index 76789c606d..d8cb69a9c4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -257,7 +257,7 @@ public final class SnippetUtils { final PropertyDescriptor descriptor = entry.getKey(); final String propertyValue = entry.getValue(); - if ( descriptor.getControllerServiceDefinition() != null ) { + if ( descriptor.getControllerServiceDefinition() != null && propertyValue != null ) { final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue); if ( referencedNode == null ) { throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found"); diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 3ff1e88fcf..ede32abd16 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -42,8 +42,12 @@ org.apache.hadoop hadoop-common provided - - + + + org.apache.nifi + nifi-distributed-cache-client-service-api + + org.apache.hadoop hadoop-hdfs provided diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 8d5749b039..44ebbf8115 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -217,4 +217,31 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { }; } + + /** + * Returns the relative path of the child that does not include the filename + * or the root path. + * @param root + * @param child + * @return + */ + public static String getPathDifference(final Path root, final Path child) { + final int depthDiff = child.depth() - root.depth(); + if (depthDiff <= 1) { + return "".intern(); + } + String lastRoot = root.getName(); + Path childsParent = child.getParent(); + final StringBuilder builder = new StringBuilder(); + builder.append(childsParent.getName()); + for (int i = (depthDiff - 3); i >= 0; i--) { + childsParent = childsParent.getParent(); + String name = childsParent.getName(); + if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { + break; + } + builder.insert(0, Path.SEPARATOR).insert(0, name); + } + return builder.toString(); + } } diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java new file mode 100644 index 0000000000..06bb3c6567 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +@SupportsBatching +@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) +@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " + + "The file in HDFS is left intact without any changes being made to it.") +@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " + + "not be fetched from HDFS") +@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) +public class FetchHDFS extends AbstractHadoopProcessor { + + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("HDFS Filename") + .description("The name of the HDFS file to retrieve") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${path}/${filename}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. " + + "This would occur, for instance, if the file is not found or if there is a permissions issue") + .build(); + static final Relationship REL_COMMS_FAILURE = new Relationship.Builder() + .name("comms.failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. " + + "This generally indicates that the Fetch should be tried again.") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(FILENAME); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_COMMS_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final FileSystem hdfs = hdfsResources.get().getValue(); + final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); + final URI uri = path.toUri(); + + final StopWatch stopWatch = new StopWatch(true); + try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { + flowFile = session.importFrom(inStream, flowFile); + stopWatch.stop(); + getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().modifyContent(flowFile, "Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final FileNotFoundException | AccessControlException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final IOException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_COMMS_FAILURE); + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index d763c29818..1dd5b91b54 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -58,16 +58,13 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -/** - * This processor reads files from HDFS into NiFi FlowFiles. - */ @TriggerWhenEmpty @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) -@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles") +@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") }) -@SeeAlso(PutHDFS.class) +@SeeAlso({PutHDFS.class, ListHDFS.class}) public class GetHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; @@ -104,7 +101,7 @@ public class GetHDFS extends AbstractHadoopProcessor { public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder() .name("Keep Source File") - .description("Determines whether to delete the file from HDFS after it has been successfully transferred") + .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.") .required(true) .allowableValues("true", "false") .defaultValue("false") @@ -464,32 +461,7 @@ public class GetHDFS extends AbstractHadoopProcessor { return files; } - /** - * Returns the relative path of the child that does not include the filename - * or the root path. - * @param root - * @param child - * @return - */ - public static String getPathDifference(final Path root, final Path child) { - final int depthDiff = child.depth() - root.depth(); - if (depthDiff <= 1) { - return "".intern(); - } - String lastRoot = root.getName(); - Path childsParent = child.getParent(); - final StringBuilder builder = new StringBuilder(); - builder.append(childsParent.getName()); - for (int i = (depthDiff - 3); i >= 0; i--) { - childsParent = childsParent.getParent(); - String name = childsParent.getName(); - if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { - break; - } - builder.insert(0, Path.SEPARATOR).insert(0, name); - } - return builder.toString(); - } + /** * Holder for a snapshot in time of some processor properties that are diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java new file mode 100644 index 0000000000..707b50d8b5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.util.HDFSListing; +import org.apache.nifi.processors.hadoop.util.StringSerDe; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + + +@TriggerSerially +@TriggerWhenEmpty +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " + + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " + + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " + + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") +@WritesAttributes({ + @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), + @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--") +}) +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class ListHDFS extends AbstractHadoopProcessor { + + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name(DIRECTORY_PROP_NAME) + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + + private volatile Long lastListingTime = null; + private volatile Set latestPathsListed = new HashSet<>(); + private volatile boolean electedPrimaryNode = false; + private File persistenceFile = null; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + persistenceFile = new File("conf/state/" + getIdentifier()); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(DIRECTORY); + properties.add(RECURSE_SUBDIRS); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + private String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { + electedPrimaryNode = true; + } + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if ( descriptor.equals(DIRECTORY) ) { + lastListingTime = null; // clear lastListingTime so that we have to fetch new time + latestPathsListed = new HashSet<>(); + } + } + + private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, HDFSListing.class); + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String directory = context.getProperty(DIRECTORY).getValue(); + + // Determine the timestamp for the last file that we've listed. + Long minTimestamp = lastListingTime; + if ( minTimestamp == null || electedPrimaryNode ) { + // We haven't yet restored any state from local or distributed state - or it's been at least a minute since + // we have performed a listing. In this case, + // First, attempt to get timestamp from distributed cache service. + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + try { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(directory), serde, serde); + if ( serializedState == null || serializedState.isEmpty() ) { + minTimestamp = null; + this.latestPathsListed = Collections.emptySet(); + } else { + final HDFSListing listing = deserialize(serializedState); + this.lastListingTime = listing.getLatestTimestamp().getTime(); + minTimestamp = listing.getLatestTimestamp().getTime(); + this.latestPathsListed = listing.toPaths(); + } + + this.lastListingTime = minTimestamp; + electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + + // Check the persistence file. We want to use the latest timestamp that we have so that + // we don't duplicate data. + try { + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + final Properties props = new Properties(); + props.load(fis); + + // get the local timestamp for this directory, if it exists. + final String locallyPersistedValue = props.getProperty(directory); + if ( locallyPersistedValue != null ) { + final HDFSListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + + // If distributed state doesn't have an entry or the local entry is later than the distributed state, + // update the distributed state so that we are in sync. + if (minTimestamp == null || localTimestamp > minTimestamp) { + minTimestamp = localTimestamp; + + // Our local persistence file shows a later time than the Distributed service. + // Update the distributed service to match our local state. + try { + final StringSerDe serde = new StringSerDe(); + client.put(getKey(directory), locallyPersistedValue, serde, serde); + } catch (final IOException ioe) { + getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " + + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", + new Object[] {directory, locallyPersistedValue, ioe}); + } + } + } + } + } + } catch (final IOException ioe) { + getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); + } + } + + + // Pull in any file that is newer than the timestamp that we have. + final FileSystem hdfs = hdfsResources.get().getValue(); + final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + final Path rootPath = new Path(directory); + + int listCount = 0; + Long latestListingModTime = null; + final Set statuses; + try { + statuses = getStatuses(rootPath, recursive, hdfs); + for ( final FileStatus status : statuses ) { + // don't get anything where the last modified timestamp is equal to our current timestamp. + // if we do, then we run the risk of multiple files having the same last mod date but us only + // seeing a portion of them. + // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe + // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a + // modified date not before the current time. + final long fileModTime = status.getModificationTime(); + + // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. + // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. + boolean fetch = !status.getPath().getName().endsWith("_COPYING_") && + (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); + + // Create the FlowFile for this path. + if ( fetch ) { + final Map attributes = createAttributes(status); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + listCount++; + + if ( latestListingModTime == null || fileModTime > latestListingModTime ) { + latestListingModTime = fileModTime; + } + } + } + } catch (final IOException ioe) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); + return; + } + + if ( listCount > 0 ) { + getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); + session.commit(); + + // We have performed a listing and pushed the FlowFiles out. + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we pulled in. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + String serializedState = null; + try { + serializedState = serializeState(latestListingModTime, statuses); + } catch (final Exception e) { + getLogger().error("Failed to serialize state due to {}", new Object[] {e}); + } + + if ( serializedState != null ) { + // Save our state locally. + try { + persistLocalState(directory, serializedState); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); + } + + // Attempt to save state to remote server. + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + try { + client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); + } + } + + lastListingTime = latestListingModTime; + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + + // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system + if ( lastListingTime == null ) { + lastListingTime = 0L; + } + + return; + } + } + + private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { + final Set statusSet = new HashSet<>(); + + final FileStatus[] statuses = hdfs.listStatus(path); + + for ( final FileStatus status : statuses ) { + if ( status.isDirectory() ) { + if ( recursive ) { + try { + statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); + } + } + } else { + statusSet.add(status); + } + } + + return statusSet; + } + + + private String serializeState(final long latestListingTime, final Set statuses) throws JsonGenerationException, JsonMappingException, IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + if ( statuses.isEmpty() ) { + return null; + } else { + final List sortedStatuses = new ArrayList<>(statuses); + Collections.sort(sortedStatuses, new Comparator() { + @Override + public int compare(final FileStatus o1, final FileStatus o2) { + return Long.compare(o1.getModificationTime(), o2.getModificationTime()); + } + }); + + final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); + final Set pathsWithModTimeEqualToListingModTime = new HashSet<>(); + for (int i=sortedStatuses.size() - 1; i >= 0; i--) { + final FileStatus status = sortedStatuses.get(i); + if (status.getModificationTime() == latestListingModTime) { + pathsWithModTimeEqualToListingModTime.add(status.getPath()); + } + } + + this.latestPathsListed = pathsWithModTimeEqualToListingModTime; + + final HDFSListing listing = new HDFSListing(); + listing.setLatestTimestamp(new Date(latestListingModTime)); + final Set paths = new HashSet<>(); + for ( final Path path : pathsWithModTimeEqualToListingModTime ) { + paths.add(path.toUri().toString()); + } + listing.setMatchingPaths(paths); + + final ObjectMapper mapper = new ObjectMapper(); + final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); + return serializedState; + } + } + + private void persistLocalState(final String directory, final String serializedState) throws IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + final File dir = persistenceFile.getParentFile(); + if ( !dir.exists() && !dir.mkdirs() ) { + throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + } + + final Properties props = new Properties(); + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + } + + props.setProperty(directory, serializedState); + + try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { + props.store(fos, null); + } + } + + private String getAbsolutePath(final Path path) { + final Path parent = path.getParent(); + final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); + return prefix + "/" + path.getName(); + } + + private Map createAttributes(final FileStatus status) { + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); + attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); + + attributes.put("hdfs.owner", status.getOwner()); + attributes.put("hdfs.group", status.getGroup()); + attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); + attributes.put("hdfs.length", String.valueOf(status.getLen())); + attributes.put("hdfs.replication", String.valueOf(status.getReplication())); + + final FsPermission permission = status.getPermission(); + final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); + attributes.put("hdfs.permissions", perms); + return attributes; + } + + private String getPerms(final FsAction action) { + final StringBuilder sb = new StringBuilder(); + if ( action.implies(FsAction.READ) ) { + sb.append("r"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.WRITE) ) { + sb.append("w"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.EXECUTE) ) { + sb.append("x"); + } else { + sb.append("-"); + } + + return sb.toString(); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java new file mode 100644 index 0000000000..9f4d68b241 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; + +import org.apache.hadoop.fs.Path; + +/** + * A simple POJO for maintaining state about the last HDFS Listing that was performed so that + * we can avoid pulling the same file multiple times + */ +@XmlType(name = "listing") +public class HDFSListing { + private Date latestTimestamp; + private Collection matchingPaths; + + /** + * @return the modification date of the newest file that was contained in the HDFS Listing + */ + public Date getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing + * + * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing + */ + public void setLatestTimestamp(Date latestTimestamp) { + this.latestTimestamp = latestTimestamp; + } + + /** + * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date + * was equal to {@link #getLatestTimestamp()} + */ + @XmlTransient + public Collection getMatchingPaths() { + return matchingPaths; + } + + /** + * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} + */ + public Set toPaths() { + final Set paths = new HashSet<>(matchingPaths.size()); + for ( final String pathname : matchingPaths ) { + paths.add(new Path(pathname)); + } + return paths; + } + + /** + * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was + * equal to {@link #getLatestTimestamp()} + * @param matchingPaths + */ + public void setMatchingPaths(Collection matchingPaths) { + this.matchingPaths = matchingPaths; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java new file mode 100644 index 0000000000..ef0e590a7f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class LongSerDe implements Serializer, Deserializer { + + @Override + public Long deserialize(final byte[] input) throws DeserializationException, IOException { + if ( input == null || input.length == 0 ) { + return null; + } + + final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); + return dis.readLong(); + } + + @Override + public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeLong(value); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java new file mode 100644 index 0000000000..848831f475 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class StringSerDe implements Serializer, Deserializer { + + @Override + public String deserialize(final byte[] value) throws DeserializationException, IOException { + if ( value == null ) { + return null; + } + + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index da16ef77fb..4b359e8b7e 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.hadoop.GetHDFS -org.apache.nifi.processors.hadoop.PutHDFS org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile +org.apache.nifi.processors.hadoop.FetchHDFS +org.apache.nifi.processors.hadoop.GetHDFS org.apache.nifi.processors.hadoop.GetHDFSSequenceFile +org.apache.nifi.processors.hadoop.ListHDFS +org.apache.nifi.processors.hadoop.PutHDFS diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index eed0d36997..df7297a44a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -178,6 +178,11 @@ public class TestDetectDuplicate { exists = false; return true; } + + @Override + public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + + } } private static class StringSerializer implements Serializer { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index 82831375d8..975dc63b64 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -82,6 +82,20 @@ public interface DistributedMapCacheClient extends ControllerService { */ boolean containsKey(K key, Serializer keySerializer) throws IOException; + /** + * Adds the specified key and value to the cache, overwriting any value that is + * currently set. + * + * @param key The key to set + * @param value The value to associate with the given Key + * @param keySerializer the Serializer that will be used to serialize the key into bytes + * @param valueSerializer the Serializer that will be used to serialize the value into bytes + * + * @throws IOException if unable to communicate with the remote instance + * @throws NullPointerException if the key or either serializer is null + */ + void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException; + /** * Returns the value in the cache for the given key, if one exists; * otherwise returns null diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 92bda8f72c..06ff42bbf0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -116,6 +116,28 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + withCommsSession(new CommsAction() { + @Override + public Object execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("put"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + + dos.flush(); + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final boolean success = dis.readBoolean(); + if ( !success ) { + throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response"); + } + + return null; + } + }); + } + @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { return withCommsSession(new CommsAction() { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 534cb0b204..89030461bf 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; public interface MapCache { MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException; boolean containsKey(ByteBuffer key) throws IOException; ByteBuffer get(ByteBuffer key) throws IOException; ByteBuffer remove(ByteBuffer key) throws IOException; diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index e4a600e3fe..cf8996c512 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -65,6 +65,13 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(putResult.isSuccessful()); break; } + case "put": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(true); + break; + } case "containsKey": { final byte[] key = readValue(dis); final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 77fb77db0c..82b17878f5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -78,6 +78,30 @@ public class PersistentMapCache implements MapCache { return putResult; } + + @Override + public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException { + final MapPutResult putResult = wrapped.put(key, value); + if ( putResult.isSuccessful() ) { + // The put was successful. + final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); + final List records = new ArrayList<>(); + records.add(record); + + if ( putResult.getEvictedKey() != null ) { + records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); + } + + wali.update(Collections.singletonList(record), false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 100000 == 0 ) { + wali.checkpoint(); + } + } + + return putResult; + } @Override public boolean containsKey(final ByteBuffer key) throws IOException { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index 10139f149b..d8f9c452db 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -106,6 +106,28 @@ public class SimpleMapCache implements MapCache { } } + + @Override + public MapPutResult put(final ByteBuffer key, final ByteBuffer value) { + writeLock.lock(); + try { + // evict if we need to in order to make room for a new entry. + final MapCacheRecord evicted = evict(); + + final MapCacheRecord record = new MapCacheRecord(key, value); + final MapCacheRecord existing = cache.put(key, record); + inverseCacheMap.put(record, key); + + final ByteBuffer existingValue = (existing == null) ? null : existing.getValue(); + final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey(); + final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue(); + + return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue); + } finally { + writeLock.unlock(); + } + } + @Override public boolean containsKey(final ByteBuffer key) { readLock.lock();