mirror of https://github.com/apache/nifi.git
NIFI-533: Initial implementation of FetchHDFS and ListHDFS
This commit is contained in:
parent
6fa596884b
commit
94945a6fd5
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.annotation.notification;
|
||||||
|
|
||||||
|
import java.lang.annotation.Documented;
|
||||||
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Inherited;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Marker annotation that a component can use to indicate that a method should be
|
||||||
|
* called whenever the state of the Primary Node in a cluster has changed.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Methods with this annotation should take either no arguments or one argument of type
|
||||||
|
* {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context about what changed
|
||||||
|
* so that the component can take appropriate action.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
@Documented
|
||||||
|
@Target({ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Inherited
|
||||||
|
public @interface OnPrimaryNodeStateChange {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.annotation.notification;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a state change that occurred for the Primary Node of a NiFi cluster.
|
||||||
|
*/
|
||||||
|
public enum PrimaryNodeState {
|
||||||
|
/**
|
||||||
|
* The node receiving this state has been elected the Primary Node of the NiFi cluster.
|
||||||
|
*/
|
||||||
|
ELECTED_PRIMARY_NODE,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The node receiving this state was the Primary Node but has now had its Primary Node
|
||||||
|
* role revoked.
|
||||||
|
*/
|
||||||
|
PRIMARY_NODE_REVOKED;
|
||||||
|
}
|
|
@ -53,6 +53,8 @@ import org.apache.nifi.admin.service.UserService;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
||||||
|
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
|
||||||
|
import org.apache.nifi.annotation.notification.PrimaryNodeState;
|
||||||
import org.apache.nifi.cluster.BulletinsPayload;
|
import org.apache.nifi.cluster.BulletinsPayload;
|
||||||
import org.apache.nifi.cluster.HeartbeatPayload;
|
import org.apache.nifi.cluster.HeartbeatPayload;
|
||||||
import org.apache.nifi.cluster.protocol.DataFlow;
|
import org.apache.nifi.cluster.protocol.DataFlow;
|
||||||
|
@ -74,8 +76,8 @@ import org.apache.nifi.connectable.Position;
|
||||||
import org.apache.nifi.connectable.Size;
|
import org.apache.nifi.connectable.Size;
|
||||||
import org.apache.nifi.connectable.StandardConnection;
|
import org.apache.nifi.connectable.StandardConnection;
|
||||||
import org.apache.nifi.controller.exception.CommunicationsException;
|
import org.apache.nifi.controller.exception.CommunicationsException;
|
||||||
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
|
|
||||||
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
||||||
|
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
|
||||||
import org.apache.nifi.controller.label.Label;
|
import org.apache.nifi.controller.label.Label;
|
||||||
import org.apache.nifi.controller.label.StandardLabel;
|
import org.apache.nifi.controller.label.StandardLabel;
|
||||||
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
|
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
|
||||||
|
@ -3098,6 +3100,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
|
|
||||||
LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'");
|
LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'");
|
||||||
|
|
||||||
|
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
|
||||||
|
final ProcessGroup rootGroup = getGroup(getRootGroupId());
|
||||||
|
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
|
||||||
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
|
||||||
|
}
|
||||||
|
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||||
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
|
||||||
|
}
|
||||||
|
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
|
||||||
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
|
||||||
|
}
|
||||||
|
|
||||||
// update primary
|
// update primary
|
||||||
this.primary = primary;
|
this.primary = primary;
|
||||||
eventDrivenWorkerQueue.setPrimary(primary);
|
eventDrivenWorkerQueue.setPrimary(primary);
|
||||||
|
|
|
@ -257,7 +257,7 @@ public final class SnippetUtils {
|
||||||
final PropertyDescriptor descriptor = entry.getKey();
|
final PropertyDescriptor descriptor = entry.getKey();
|
||||||
final String propertyValue = entry.getValue();
|
final String propertyValue = entry.getValue();
|
||||||
|
|
||||||
if ( descriptor.getControllerServiceDefinition() != null ) {
|
if ( descriptor.getControllerServiceDefinition() != null && propertyValue != null ) {
|
||||||
final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue);
|
final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue);
|
||||||
if ( referencedNode == null ) {
|
if ( referencedNode == null ) {
|
||||||
throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found");
|
throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found");
|
||||||
|
|
|
@ -42,8 +42,12 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
|
|
|
@ -217,4 +217,31 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the relative path of the child that does not include the filename
|
||||||
|
* or the root path.
|
||||||
|
* @param root
|
||||||
|
* @param child
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static String getPathDifference(final Path root, final Path child) {
|
||||||
|
final int depthDiff = child.depth() - root.depth();
|
||||||
|
if (depthDiff <= 1) {
|
||||||
|
return "".intern();
|
||||||
|
}
|
||||||
|
String lastRoot = root.getName();
|
||||||
|
Path childsParent = child.getParent();
|
||||||
|
final StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append(childsParent.getName());
|
||||||
|
for (int i = (depthDiff - 3); i >= 0; i--) {
|
||||||
|
childsParent = childsParent.getParent();
|
||||||
|
String name = childsParent.getName();
|
||||||
|
if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
builder.insert(0, Path.SEPARATOR).insert(0, name);
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.hadoop;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
|
@SupportsBatching
|
||||||
|
@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
|
||||||
|
@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
|
||||||
|
+ "The file in HDFS is left intact without any changes being made to it.")
|
||||||
|
@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
|
||||||
|
+ "not be fetched from HDFS")
|
||||||
|
@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
|
||||||
|
public class FetchHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
|
static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("HDFS Filename")
|
||||||
|
.description("The name of the HDFS file to retrieve")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue("${path}/${filename}")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
|
.name("failure")
|
||||||
|
.description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. "
|
||||||
|
+ "This would occur, for instance, if the file is not found or if there is a permissions issue")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
|
||||||
|
.name("comms.failure")
|
||||||
|
.description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. "
|
||||||
|
+ "This generally indicates that the Fetch should be tried again.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(HADOOP_CONFIGURATION_RESOURCES);
|
||||||
|
properties.add(FILENAME);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
relationships.add(REL_COMMS_FAILURE);
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if ( flowFile == null ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final FileSystem hdfs = hdfsResources.get().getValue();
|
||||||
|
final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
|
final URI uri = path.toUri();
|
||||||
|
|
||||||
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
|
try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
|
||||||
|
flowFile = session.importFrom(inStream, flowFile);
|
||||||
|
stopWatch.stop();
|
||||||
|
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
|
||||||
|
session.getProvenanceReporter().modifyContent(flowFile, "Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
} catch (final FileNotFoundException | AccessControlException e) {
|
||||||
|
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
|
||||||
|
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_COMMS_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -58,16 +58,13 @@ import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.util.StopWatch;
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
/**
|
|
||||||
* This processor reads files from HDFS into NiFi FlowFiles.
|
|
||||||
*/
|
|
||||||
@TriggerWhenEmpty
|
@TriggerWhenEmpty
|
||||||
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
|
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
|
||||||
@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles")
|
@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
|
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
|
||||||
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
|
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
|
||||||
@SeeAlso(PutHDFS.class)
|
@SeeAlso({PutHDFS.class, ListHDFS.class})
|
||||||
public class GetHDFS extends AbstractHadoopProcessor {
|
public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
||||||
|
@ -104,7 +101,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
|
||||||
.name("Keep Source File")
|
.name("Keep Source File")
|
||||||
.description("Determines whether to delete the file from HDFS after it has been successfully transferred")
|
.description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues("true", "false")
|
.allowableValues("true", "false")
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
|
@ -464,32 +461,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
return files;
|
return files;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the relative path of the child that does not include the filename
|
|
||||||
* or the root path.
|
|
||||||
* @param root
|
|
||||||
* @param child
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static String getPathDifference(final Path root, final Path child) {
|
|
||||||
final int depthDiff = child.depth() - root.depth();
|
|
||||||
if (depthDiff <= 1) {
|
|
||||||
return "".intern();
|
|
||||||
}
|
|
||||||
String lastRoot = root.getName();
|
|
||||||
Path childsParent = child.getParent();
|
|
||||||
final StringBuilder builder = new StringBuilder();
|
|
||||||
builder.append(childsParent.getName());
|
|
||||||
for (int i = (depthDiff - 3); i >= 0; i--) {
|
|
||||||
childsParent = childsParent.getParent();
|
|
||||||
String name = childsParent.getName();
|
|
||||||
if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
builder.insert(0, Path.SEPARATOR).insert(0, name);
|
|
||||||
}
|
|
||||||
return builder.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holder for a snapshot in time of some processor properties that are
|
* Holder for a snapshot in time of some processor properties that are
|
||||||
|
|
|
@ -0,0 +1,466 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.hadoop;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
|
||||||
|
import org.apache.nifi.annotation.notification.PrimaryNodeState;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.hadoop.util.HDFSListing;
|
||||||
|
import org.apache.nifi.processors.hadoop.util.StringSerDe;
|
||||||
|
import org.codehaus.jackson.JsonGenerationException;
|
||||||
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
import org.codehaus.jackson.JsonParseException;
|
||||||
|
import org.codehaus.jackson.map.JsonMappingException;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
|
||||||
|
@TriggerSerially
|
||||||
|
@TriggerWhenEmpty
|
||||||
|
@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
|
||||||
|
@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
|
||||||
|
+ "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
|
||||||
|
+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
|
||||||
|
+ "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
|
||||||
|
@WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
|
||||||
|
@WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
|
||||||
|
@WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
|
||||||
|
@WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
|
||||||
|
@WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
|
||||||
|
@WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
|
||||||
|
@WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--")
|
||||||
|
})
|
||||||
|
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
|
||||||
|
public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Distributed Cache Service")
|
||||||
|
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
|
||||||
|
.required(true)
|
||||||
|
.identifiesControllerService(DistributedMapCacheClient.class)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
||||||
|
.name(DIRECTORY_PROP_NAME)
|
||||||
|
.description("The HDFS directory from which files should be read")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
|
||||||
|
.name("Recurse Subdirectories")
|
||||||
|
.description("Indicates whether to list files from subdirectories of the HDFS directory")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("All FlowFiles are transferred to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private volatile Long lastListingTime = null;
|
||||||
|
private volatile Set<Path> latestPathsListed = new HashSet<>();
|
||||||
|
private volatile boolean electedPrimaryNode = false;
|
||||||
|
private File persistenceFile = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
super.init(context);
|
||||||
|
persistenceFile = new File("conf/state/" + getIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(HADOOP_CONFIGURATION_RESOURCES);
|
||||||
|
properties.add(DISTRIBUTED_CACHE_SERVICE);
|
||||||
|
properties.add(DIRECTORY);
|
||||||
|
properties.add(RECURSE_SUBDIRS);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getKey(final String directory) {
|
||||||
|
return getIdentifier() + ".lastListingTime." + directory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnPrimaryNodeStateChange
|
||||||
|
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
|
||||||
|
if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
|
||||||
|
electedPrimaryNode = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
|
if ( descriptor.equals(DIRECTORY) ) {
|
||||||
|
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
|
||||||
|
latestPathsListed = new HashSet<>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
final JsonNode jsonNode = mapper.readTree(serializedState);
|
||||||
|
return mapper.readValue(jsonNode, HDFSListing.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final String directory = context.getProperty(DIRECTORY).getValue();
|
||||||
|
|
||||||
|
// Determine the timestamp for the last file that we've listed.
|
||||||
|
Long minTimestamp = lastListingTime;
|
||||||
|
if ( minTimestamp == null || electedPrimaryNode ) {
|
||||||
|
// We haven't yet restored any state from local or distributed state - or it's been at least a minute since
|
||||||
|
// we have performed a listing. In this case,
|
||||||
|
// First, attempt to get timestamp from distributed cache service.
|
||||||
|
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final StringSerDe serde = new StringSerDe();
|
||||||
|
final String serializedState = client.get(getKey(directory), serde, serde);
|
||||||
|
if ( serializedState == null || serializedState.isEmpty() ) {
|
||||||
|
minTimestamp = null;
|
||||||
|
this.latestPathsListed = Collections.emptySet();
|
||||||
|
} else {
|
||||||
|
final HDFSListing listing = deserialize(serializedState);
|
||||||
|
this.lastListingTime = listing.getLatestTimestamp().getTime();
|
||||||
|
minTimestamp = listing.getLatestTimestamp().getTime();
|
||||||
|
this.latestPathsListed = listing.toPaths();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.lastListingTime = minTimestamp;
|
||||||
|
electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
|
||||||
|
context.yield();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the persistence file. We want to use the latest timestamp that we have so that
|
||||||
|
// we don't duplicate data.
|
||||||
|
try {
|
||||||
|
if ( persistenceFile.exists() ) {
|
||||||
|
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.load(fis);
|
||||||
|
|
||||||
|
// get the local timestamp for this directory, if it exists.
|
||||||
|
final String locallyPersistedValue = props.getProperty(directory);
|
||||||
|
if ( locallyPersistedValue != null ) {
|
||||||
|
final HDFSListing listing = deserialize(locallyPersistedValue);
|
||||||
|
final long localTimestamp = listing.getLatestTimestamp().getTime();
|
||||||
|
|
||||||
|
// If distributed state doesn't have an entry or the local entry is later than the distributed state,
|
||||||
|
// update the distributed state so that we are in sync.
|
||||||
|
if (minTimestamp == null || localTimestamp > minTimestamp) {
|
||||||
|
minTimestamp = localTimestamp;
|
||||||
|
|
||||||
|
// Our local persistence file shows a later time than the Distributed service.
|
||||||
|
// Update the distributed service to match our local state.
|
||||||
|
try {
|
||||||
|
final StringSerDe serde = new StringSerDe();
|
||||||
|
client.put(getKey(directory), locallyPersistedValue, serde, serde);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
|
||||||
|
+ "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
|
||||||
|
new Object[] {directory, locallyPersistedValue, ioe});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Pull in any file that is newer than the timestamp that we have.
|
||||||
|
final FileSystem hdfs = hdfsResources.get().getValue();
|
||||||
|
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
|
||||||
|
final Path rootPath = new Path(directory);
|
||||||
|
|
||||||
|
int listCount = 0;
|
||||||
|
Long latestListingModTime = null;
|
||||||
|
final Set<FileStatus> statuses;
|
||||||
|
try {
|
||||||
|
statuses = getStatuses(rootPath, recursive, hdfs);
|
||||||
|
for ( final FileStatus status : statuses ) {
|
||||||
|
// don't get anything where the last modified timestamp is equal to our current timestamp.
|
||||||
|
// if we do, then we run the risk of multiple files having the same last mod date but us only
|
||||||
|
// seeing a portion of them.
|
||||||
|
// I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
|
||||||
|
// only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
|
||||||
|
// modified date not before the current time.
|
||||||
|
final long fileModTime = status.getModificationTime();
|
||||||
|
|
||||||
|
// we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
|
||||||
|
// Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
|
||||||
|
boolean fetch = !status.getPath().getName().endsWith("_COPYING_") &&
|
||||||
|
(minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
|
||||||
|
|
||||||
|
// Create the FlowFile for this path.
|
||||||
|
if ( fetch ) {
|
||||||
|
final Map<String, String> attributes = createAttributes(status);
|
||||||
|
FlowFile flowFile = session.create();
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
listCount++;
|
||||||
|
|
||||||
|
if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
|
||||||
|
latestListingModTime = fileModTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( listCount > 0 ) {
|
||||||
|
getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
// We have performed a listing and pushed the FlowFiles out.
|
||||||
|
// Now, we need to persist state about the Last Modified timestamp of the newest file
|
||||||
|
// that we pulled in. We do this in order to avoid pulling in the same file twice.
|
||||||
|
// However, we want to save the state both locally and remotely.
|
||||||
|
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
|
||||||
|
// previously Primary Node left off.
|
||||||
|
// We also store the state locally so that if the node is restarted, and the node cannot contact
|
||||||
|
// the distributed state cache, the node can continue to run (if it is primary node).
|
||||||
|
String serializedState = null;
|
||||||
|
try {
|
||||||
|
serializedState = serializeState(latestListingModTime, statuses);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
getLogger().error("Failed to serialize state due to {}", new Object[] {e});
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( serializedState != null ) {
|
||||||
|
// Save our state locally.
|
||||||
|
try {
|
||||||
|
persistLocalState(directory, serializedState);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to save state to remote server.
|
||||||
|
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
|
||||||
|
try {
|
||||||
|
client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lastListingTime = latestListingModTime;
|
||||||
|
} else {
|
||||||
|
getLogger().debug("There is no data to list. Yielding.");
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
|
||||||
|
if ( lastListingTime == null ) {
|
||||||
|
lastListingTime = 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
|
||||||
|
final Set<FileStatus> statusSet = new HashSet<>();
|
||||||
|
|
||||||
|
final FileStatus[] statuses = hdfs.listStatus(path);
|
||||||
|
|
||||||
|
for ( final FileStatus status : statuses ) {
|
||||||
|
if ( status.isDirectory() ) {
|
||||||
|
if ( recursive ) {
|
||||||
|
try {
|
||||||
|
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
statusSet.add(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
|
||||||
|
// we need to keep track of all files that we pulled in that had a modification time equal to
|
||||||
|
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
|
||||||
|
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
|
||||||
|
// later in the same millisecond.
|
||||||
|
if ( statuses.isEmpty() ) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
|
||||||
|
Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
|
||||||
|
@Override
|
||||||
|
public int compare(final FileStatus o1, final FileStatus o2) {
|
||||||
|
return Long.compare(o1.getModificationTime(), o2.getModificationTime());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
|
||||||
|
final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
|
||||||
|
for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
|
||||||
|
final FileStatus status = sortedStatuses.get(i);
|
||||||
|
if (status.getModificationTime() == latestListingModTime) {
|
||||||
|
pathsWithModTimeEqualToListingModTime.add(status.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
|
||||||
|
|
||||||
|
final HDFSListing listing = new HDFSListing();
|
||||||
|
listing.setLatestTimestamp(new Date(latestListingModTime));
|
||||||
|
final Set<String> paths = new HashSet<>();
|
||||||
|
for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
|
||||||
|
paths.add(path.toUri().toString());
|
||||||
|
}
|
||||||
|
listing.setMatchingPaths(paths);
|
||||||
|
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
|
||||||
|
return serializedState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistLocalState(final String directory, final String serializedState) throws IOException {
|
||||||
|
// we need to keep track of all files that we pulled in that had a modification time equal to
|
||||||
|
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
|
||||||
|
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
|
||||||
|
// later in the same millisecond.
|
||||||
|
final File dir = persistenceFile.getParentFile();
|
||||||
|
if ( !dir.exists() && !dir.mkdirs() ) {
|
||||||
|
throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
|
||||||
|
}
|
||||||
|
|
||||||
|
final Properties props = new Properties();
|
||||||
|
if ( persistenceFile.exists() ) {
|
||||||
|
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||||
|
props.load(fis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
props.setProperty(directory, serializedState);
|
||||||
|
|
||||||
|
try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
|
||||||
|
props.store(fos, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getAbsolutePath(final Path path) {
|
||||||
|
final Path parent = path.getParent();
|
||||||
|
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
|
||||||
|
return prefix + "/" + path.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> createAttributes(final FileStatus status) {
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
|
||||||
|
|
||||||
|
attributes.put("hdfs.owner", status.getOwner());
|
||||||
|
attributes.put("hdfs.group", status.getGroup());
|
||||||
|
attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
|
||||||
|
attributes.put("hdfs.length", String.valueOf(status.getLen()));
|
||||||
|
attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
|
||||||
|
|
||||||
|
final FsPermission permission = status.getPermission();
|
||||||
|
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
|
||||||
|
attributes.put("hdfs.permissions", perms);
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getPerms(final FsAction action) {
|
||||||
|
final StringBuilder sb = new StringBuilder();
|
||||||
|
if ( action.implies(FsAction.READ) ) {
|
||||||
|
sb.append("r");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( action.implies(FsAction.WRITE) ) {
|
||||||
|
sb.append("w");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( action.implies(FsAction.EXECUTE) ) {
|
||||||
|
sb.append("x");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlTransient;
|
||||||
|
import javax.xml.bind.annotation.XmlType;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple POJO for maintaining state about the last HDFS Listing that was performed so that
|
||||||
|
* we can avoid pulling the same file multiple times
|
||||||
|
*/
|
||||||
|
@XmlType(name = "listing")
|
||||||
|
public class HDFSListing {
|
||||||
|
private Date latestTimestamp;
|
||||||
|
private Collection<String> matchingPaths;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the modification date of the newest file that was contained in the HDFS Listing
|
||||||
|
*/
|
||||||
|
public Date getLatestTimestamp() {
|
||||||
|
return latestTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
|
||||||
|
*
|
||||||
|
* @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
|
||||||
|
*/
|
||||||
|
public void setLatestTimestamp(Date latestTimestamp) {
|
||||||
|
this.latestTimestamp = latestTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
|
||||||
|
* was equal to {@link #getLatestTimestamp()}
|
||||||
|
*/
|
||||||
|
@XmlTransient
|
||||||
|
public Collection<String> getMatchingPaths() {
|
||||||
|
return matchingPaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
|
||||||
|
*/
|
||||||
|
public Set<Path> toPaths() {
|
||||||
|
final Set<Path> paths = new HashSet<>(matchingPaths.size());
|
||||||
|
for ( final String pathname : matchingPaths ) {
|
||||||
|
paths.add(new Path(pathname));
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
|
||||||
|
* equal to {@link #getLatestTimestamp()}
|
||||||
|
* @param matchingPaths
|
||||||
|
*/
|
||||||
|
public void setMatchingPaths(Collection<String> matchingPaths) {
|
||||||
|
this.matchingPaths = matchingPaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.hadoop.util;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
|
||||||
|
|
||||||
|
public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long deserialize(final byte[] input) throws DeserializationException, IOException {
|
||||||
|
if ( input == null || input.length == 0 ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
|
||||||
|
return dis.readLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
|
||||||
|
final DataOutputStream dos = new DataOutputStream(out);
|
||||||
|
dos.writeLong(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.hadoop.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
|
||||||
|
|
||||||
|
public class StringSerDe implements Serializer<String>, Deserializer<String> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String deserialize(final byte[] value) throws DeserializationException, IOException {
|
||||||
|
if ( value == null ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new String(value, StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
|
||||||
|
out.write(value.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,7 +12,9 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.processors.hadoop.GetHDFS
|
|
||||||
org.apache.nifi.processors.hadoop.PutHDFS
|
|
||||||
org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile
|
org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile
|
||||||
|
org.apache.nifi.processors.hadoop.FetchHDFS
|
||||||
|
org.apache.nifi.processors.hadoop.GetHDFS
|
||||||
org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
|
org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
|
||||||
|
org.apache.nifi.processors.hadoop.ListHDFS
|
||||||
|
org.apache.nifi.processors.hadoop.PutHDFS
|
||||||
|
|
|
@ -178,6 +178,11 @@ public class TestDetectDuplicate {
|
||||||
exists = false;
|
exists = false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StringSerializer implements Serializer<String> {
|
private static class StringSerializer implements Serializer<String> {
|
||||||
|
|
|
@ -82,6 +82,20 @@ public interface DistributedMapCacheClient extends ControllerService {
|
||||||
*/
|
*/
|
||||||
<K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
|
<K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the specified key and value to the cache, overwriting any value that is
|
||||||
|
* currently set.
|
||||||
|
*
|
||||||
|
* @param key The key to set
|
||||||
|
* @param value The value to associate with the given Key
|
||||||
|
* @param keySerializer the Serializer that will be used to serialize the key into bytes
|
||||||
|
* @param valueSerializer the Serializer that will be used to serialize the value into bytes
|
||||||
|
*
|
||||||
|
* @throws IOException if unable to communicate with the remote instance
|
||||||
|
* @throws NullPointerException if the key or either serializer is null
|
||||||
|
*/
|
||||||
|
<K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the value in the cache for the given key, if one exists;
|
* Returns the value in the cache for the given key, if one exists;
|
||||||
* otherwise returns <code>null</code>
|
* otherwise returns <code>null</code>
|
||||||
|
|
|
@ -116,6 +116,28 @@ public class DistributedMapCacheClientService extends AbstractControllerService
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
||||||
|
withCommsSession(new CommsAction<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object execute(final CommsSession session) throws IOException {
|
||||||
|
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
|
||||||
|
dos.writeUTF("put");
|
||||||
|
|
||||||
|
serialize(key, keySerializer, dos);
|
||||||
|
serialize(value, valueSerializer, dos);
|
||||||
|
|
||||||
|
dos.flush();
|
||||||
|
final DataInputStream dis = new DataInputStream(session.getInputStream());
|
||||||
|
final boolean success = dis.readBoolean();
|
||||||
|
if ( !success ) {
|
||||||
|
throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response");
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
|
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
|
||||||
return withCommsSession(new CommsAction<Boolean>() {
|
return withCommsSession(new CommsAction<Boolean>() {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
|
||||||
public interface MapCache {
|
public interface MapCache {
|
||||||
|
|
||||||
MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
|
MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
|
||||||
|
MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException;
|
||||||
boolean containsKey(ByteBuffer key) throws IOException;
|
boolean containsKey(ByteBuffer key) throws IOException;
|
||||||
ByteBuffer get(ByteBuffer key) throws IOException;
|
ByteBuffer get(ByteBuffer key) throws IOException;
|
||||||
ByteBuffer remove(ByteBuffer key) throws IOException;
|
ByteBuffer remove(ByteBuffer key) throws IOException;
|
||||||
|
|
|
@ -65,6 +65,13 @@ public class MapCacheServer extends AbstractCacheServer {
|
||||||
dos.writeBoolean(putResult.isSuccessful());
|
dos.writeBoolean(putResult.isSuccessful());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case "put": {
|
||||||
|
final byte[] key = readValue(dis);
|
||||||
|
final byte[] value = readValue(dis);
|
||||||
|
cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
|
||||||
|
dos.writeBoolean(true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case "containsKey": {
|
case "containsKey": {
|
||||||
final byte[] key = readValue(dis);
|
final byte[] key = readValue(dis);
|
||||||
final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
|
final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
|
||||||
|
|
|
@ -78,6 +78,30 @@ public class PersistentMapCache implements MapCache {
|
||||||
|
|
||||||
return putResult;
|
return putResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
|
||||||
|
final MapPutResult putResult = wrapped.put(key, value);
|
||||||
|
if ( putResult.isSuccessful() ) {
|
||||||
|
// The put was successful.
|
||||||
|
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
|
||||||
|
final List<MapWaliRecord> records = new ArrayList<>();
|
||||||
|
records.add(record);
|
||||||
|
|
||||||
|
if ( putResult.getEvictedKey() != null ) {
|
||||||
|
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
wali.update(Collections.singletonList(record), false);
|
||||||
|
|
||||||
|
final long modCount = modifications.getAndIncrement();
|
||||||
|
if ( modCount > 0 && modCount % 100000 == 0 ) {
|
||||||
|
wali.checkpoint();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return putResult;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean containsKey(final ByteBuffer key) throws IOException {
|
public boolean containsKey(final ByteBuffer key) throws IOException {
|
||||||
|
|
|
@ -106,6 +106,28 @@ public class SimpleMapCache implements MapCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
|
// evict if we need to in order to make room for a new entry.
|
||||||
|
final MapCacheRecord evicted = evict();
|
||||||
|
|
||||||
|
final MapCacheRecord record = new MapCacheRecord(key, value);
|
||||||
|
final MapCacheRecord existing = cache.put(key, record);
|
||||||
|
inverseCacheMap.put(record, key);
|
||||||
|
|
||||||
|
final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
|
||||||
|
final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
|
||||||
|
final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
|
||||||
|
|
||||||
|
return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean containsKey(final ByteBuffer key) {
|
public boolean containsKey(final ByteBuffer key) {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
|
Loading…
Reference in New Issue