NIFI-1037 Created a processor for HDFS' inotify events. This processor gets notifications for a select path.

This closes #493.
This commit is contained in:
JJ 2016-05-23 11:10:55 -04:00 committed by Pierre Villard
parent 4723f8e24c
commit 2ce785766d
9 changed files with 942 additions and 0 deletions

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
final class EventAttributes {
static final String EVENT_PATH = "hdfs.inotify.event.path";
static final String EVENT_TYPE = "hdfs.inotify.event.type";
static final String MIME_TYPE = "mime.type";
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import java.util.ArrayList;
import java.util.List;
class EventTypeValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String explanation = isValidEventType(input);
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(explanation == null)
.explanation(explanation)
.build();
}
private String isValidEventType(String input) {
if (input != null && !"".equals(input.trim())) {
final String[] events = input.split(",");
final List<String> invalid = new ArrayList<>();
for (String event : events) {
try {
Event.EventType.valueOf(event.trim().toUpperCase());
} catch (IllegalArgumentException e) {
invalid.add(event.trim());
}
}
return invalid.isEmpty() ? null : "The following are not valid event types: " + invalid;
}
return "Empty event types are not allowed.";
}
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.FetchHDFS;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@TriggerSerially
@TriggerWhenEmpty
@Tags({"hadoop", "events", "inotify", "notifications", "filesystem"})
@WritesAttributes({
@WritesAttribute(attribute = EventAttributes.MIME_TYPE, description = "This is always application/json."),
@WritesAttribute(attribute = EventAttributes.EVENT_TYPE, description = "This will specify the specific HDFS notification event type. Currently there are six types of events " +
"(append, close, create, metadata, rename, and unlink)."),
@WritesAttribute(attribute = EventAttributes.EVENT_PATH, description = "The specific path that the event is tied to.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("This processor polls the notification events provided by the HdfsAdmin API. Since this uses the HdfsAdmin APIs it is required to run as an HDFS super user. Currently there " +
"are six types of events (append, close, create, metadata, rename, and unlink). Please see org.apache.hadoop.hdfs.inotify.Event documentation for full explanations of each event. " +
"This processor will poll for new events based on a defined duration. For each event received a new flow file will be created with the expected attributes and the event itself serialized " +
"to JSON and written to the flow file's content. For example, if event.type is APPEND then the content of the flow file will contain a JSON file containing the information about the " +
"append event. If successful the flow files are sent to the 'success' relationship. Be careful of where the generated flow files are stored. If the flow files are stored in one of " +
"processor's watch directories there will be a never ending flow of events. It is also important to be aware that this processor must consume all events. The filtering must happen within " +
"the processor. This is because the HDFS admin's event notifications API does not have filtering.")
@Stateful(scopes = Scope.CLUSTER, description = "The last used transaction id is stored. This is used ")
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class, ListHDFS.class})
public class GetHDFSEvents extends AbstractHadoopProcessor {
static final PropertyDescriptor POLL_DURATION = new PropertyDescriptor.Builder()
.name("Poll Duration")
.displayName("Poll Duration")
.description("The time before the polling method returns with the next batch of events if they exist. It may exceed this amount of time by up to the time required for an " +
"RPC to the NameNode.")
.defaultValue("1 second")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor HDFS_PATH_TO_WATCH = new PropertyDescriptor.Builder()
.name("HDFS Path to Watch")
.displayName("HDFS Path to Watch")
.description("The HDFS path to get event notifications for. This property accepts both expression language and regular expressions. This will be evaluated during the " +
"OnScheduled phase.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
.name("Ignore Hidden Files")
.displayName("Ignore Hidden Files")
.description("If true and the final component of the path associated with a given event starts with a '.' then that event will not be processed.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor EVENT_TYPES = new PropertyDescriptor.Builder()
.name("Event Types to Filter On")
.displayName("Event Types to Filter On")
.description("A comma-separated list of event types to process. Valid event types are: append, close, create, metadata, rename, and unlink. Case does not matter.")
.addValidator(new EventTypeValidator())
.required(true)
.defaultValue("append, close, create, metadata, rename, unlink")
.build();
static final PropertyDescriptor NUMBER_OF_RETRIES_FOR_POLL = new PropertyDescriptor.Builder()
.name("IOException Retries During Event Polling")
.displayName("IOException Retries During Event Polling")
.description("According to the HDFS admin API for event polling it is good to retry at least a few times. This number defines how many times the poll will be retried if it " +
"throws an IOException.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.required(true)
.defaultValue("3")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A flow file with updated information about a specific event will be sent to this relationship.")
.build();
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(REL_SUCCESS)));
private static final String LAST_TX_ID = "last.tx.id";
private volatile long lastTxId = -1L;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private NotificationConfig notificationConfig;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(POLL_DURATION);
props.add(HDFS_PATH_TO_WATCH);
props.add(IGNORE_HIDDEN_FILES);
props.add(EVENT_TYPES);
props.add(NUMBER_OF_RETRIES_FOR_POLL);
return Collections.unmodifiableList(props);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void onSchedule(ProcessContext context) {
notificationConfig = new NotificationConfig(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final StateManager stateManager = context.getStateManager();
try {
StateMap state = stateManager.getState(Scope.CLUSTER);
String txIdAsString = state.get(LAST_TX_ID);
if (txIdAsString != null && !"".equals(txIdAsString)) {
lastTxId = Long.parseLong(txIdAsString);
}
} catch (IOException e) {
getLogger().error("Unable to retrieve last transaction ID. Must retrieve last processed transaction ID before processing can occur.", e);
context.yield();
return;
}
try {
final int retries = context.getProperty(NUMBER_OF_RETRIES_FOR_POLL).asInteger();
final TimeUnit pollDurationTimeUnit = TimeUnit.MICROSECONDS;
final long pollDuration = context.getProperty(POLL_DURATION).asTimePeriod(pollDurationTimeUnit);
final DFSInotifyEventInputStream eventStream = lastTxId == -1L ? getHdfsAdmin().getInotifyEventStream() : getHdfsAdmin().getInotifyEventStream(lastTxId);
final EventBatch eventBatch = getEventBatch(eventStream, pollDuration, pollDurationTimeUnit, retries);
if (eventBatch != null && eventBatch.getEvents() != null) {
if (eventBatch.getEvents().length > 0) {
List<FlowFile> flowFiles = new ArrayList<>(eventBatch.getEvents().length);
for (Event e : eventBatch.getEvents()) {
if (toProcessEvent(context, e)) {
getLogger().debug("Creating flow file for event: {}.", new Object[]{e});
final String path = getPath(e);
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
flowFile = session.putAttribute(flowFile, EventAttributes.EVENT_TYPE, e.getEventType().name());
flowFile = session.putAttribute(flowFile, EventAttributes.EVENT_PATH, path);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(OBJECT_MAPPER.writeValueAsBytes(e));
}
});
flowFiles.add(flowFile);
}
}
for (FlowFile flowFile : flowFiles) {
final String path = flowFile.getAttribute(EventAttributes.EVENT_PATH);
final String transitUri = path.startsWith("/") ? "hdfs:/" + path : "hdfs://" + path;
getLogger().debug("Transferring flow file {} and creating provenance event with URI {}.", new Object[]{flowFile, transitUri});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri);
}
}
lastTxId = eventBatch.getTxid();
}
} catch (IOException | InterruptedException e) {
getLogger().error("Unable to get notification information: {}", new Object[]{e});
context.yield();
return;
} catch (MissingEventsException e) {
// set lastTxId to -1 and update state. This may cause events not to be processed. The reason this exception is thrown is described in the
// org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStrea API. It suggests tuning a couple parameters if this API is used.
lastTxId = -1L;
getLogger().error("Unable to get notification information. Setting transaction id to -1. This may cause some events to get missed. " +
"Please see javadoc for org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStream: {}", new Object[]{e});
}
updateClusterStateForTxId(stateManager);
}
private EventBatch getEventBatch(DFSInotifyEventInputStream eventStream, long duration, TimeUnit timeUnit, int retries) throws IOException, InterruptedException, MissingEventsException {
// According to the inotify API we should retry a few times if poll throws an IOException.
// Please see org.apache.hadoop.hdfs.DFSInotifyEventInputStream#poll for documentation.
int i = 0;
while (true) {
try {
i += 1;
return eventStream.poll(duration, timeUnit);
} catch (IOException e) {
if (i > retries) {
getLogger().debug("Failed to poll for event batch. Reached max retry times.", e);
throw e;
} else {
getLogger().debug("Attempt {} failed to poll for event batch. Retrying.", new Object[]{i});
}
}
}
}
private void updateClusterStateForTxId(StateManager stateManager) {
try {
Map<String, String> newState = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
newState.put(LAST_TX_ID, String.valueOf(lastTxId));
stateManager.setState(newState, Scope.CLUSTER);
} catch (IOException e) {
getLogger().warn("Failed to update cluster state for last txId. It is possible data replication may occur.", e);
}
}
protected HdfsAdmin getHdfsAdmin() {
try {
// Currently HdfsAdmin is the only public API that allows access to the inotify API. Because of this we need to have super user rights in HDFS.
return new HdfsAdmin(getFileSystem().getUri(), getFileSystem().getConf());
} catch (IOException e) {
getLogger().error("Unable to get and instance of HDFS admin. You must be an HDFS super user to view HDFS events.");
throw new ProcessException(e);
}
}
private boolean toProcessEvent(ProcessContext context, Event event) {
final String[] eventTypes = context.getProperty(EVENT_TYPES).getValue().split(",");
for (String name : eventTypes) {
if (name.trim().equalsIgnoreCase(event.getEventType().name())) {
return notificationConfig.getPathFilter().accept(new Path(getPath(event)));
}
}
return false;
}
private String getPath(Event event) {
if (event == null || event.getEventType() == null) {
throw new IllegalArgumentException("Event and event type must not be null.");
}
switch (event.getEventType()) {
case CREATE: return ((Event.CreateEvent) event).getPath();
case CLOSE: return ((Event.CloseEvent) event).getPath();
case APPEND: return ((Event.AppendEvent) event).getPath();
case RENAME: return ((Event.RenameEvent) event).getSrcPath();
case METADATA: return ((Event.MetadataUpdateEvent) event).getPath();
case UNLINK: return ((Event.UnlinkEvent) event).getPath();
default: throw new IllegalArgumentException("Unsupported event type.");
}
}
private static class NotificationConfig {
private final PathFilter pathFilter;
NotificationConfig(ProcessContext context) {
final boolean toIgnoreHiddenFiles = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
final Pattern watchDirectory = Pattern.compile(context.getProperty(HDFS_PATH_TO_WATCH).evaluateAttributeExpressions().getValue());
pathFilter = new NotificationEventPathFilter(watchDirectory, toIgnoreHiddenFiles);
}
PathFilter getPathFilter() {
return pathFilter;
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import java.util.regex.Pattern;
class NotificationEventPathFilter implements PathFilter {
private final Pattern watchDirectory;
private final boolean toIgnoreHiddenFiles;
NotificationEventPathFilter(Pattern watchDirectory, boolean toIgnoreHiddenFiles) {
this.watchDirectory = watchDirectory;
this.toIgnoreHiddenFiles = toIgnoreHiddenFiles;
}
@Override
public boolean accept(Path path) {
return !(path == null || (toIgnoreHiddenFiles && path.getName().startsWith(".")))
&& watchDirectory.matcher(path.toString()).matches();
}
}

View File

@ -16,5 +16,6 @@ org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile
org.apache.nifi.processors.hadoop.FetchHDFS
org.apache.nifi.processors.hadoop.GetHDFS
org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents
org.apache.nifi.processors.hadoop.ListHDFS
org.apache.nifi.processors.hadoop.PutHDFS

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestEventTypeValidator {
ValidationContext context;
EventTypeValidator eventTypeValidator;
@Before
public void setUp() throws Exception {
context = Mockito.mock(ValidationContext.class);
eventTypeValidator = new EventTypeValidator();
}
@Test
public void nullInputShouldProperlyFail() throws Exception {
String subject = "subject";
String input = null;
ValidationResult result = eventTypeValidator.validate(subject, input, context);
assertEquals("subject", result.getSubject());
assertEquals(null, result.getInput());
assertEquals("Empty event types are not allowed.", result.getExplanation());
assertFalse(result.isValid());
}
@Test
public void emptyInputShouldProperlyFail() throws Exception {
String subject = "subject";
String input = "";
ValidationResult result = eventTypeValidator.validate(subject, input, context);
assertEquals("subject", result.getSubject());
assertEquals("", result.getInput());
assertEquals("Empty event types are not allowed.", result.getExplanation());
assertFalse(result.isValid());
}
@Test
public void validEventTypesShouldProperlyValidate() throws Exception {
String input = " append, Create, CLOSE";
String subject = "subject";
ValidationResult result = eventTypeValidator.validate(subject, input, context);
assertEquals("subject", result.getSubject());
assertEquals(" append, Create, CLOSE", result.getInput());
assertEquals("", result.getExplanation());
assertTrue(result.isValid());
}
@Test
public void inputWithInvalidEventTypeShouldProperlyDisplayEventsInExplanation() throws Exception {
String subject = "subject";
String input = "append, CREATE, cllose, rename, metadata, unlink";
ValidationResult result = eventTypeValidator.validate(subject, input, context);
assertEquals("subject", result.getSubject());
assertEquals("append, CREATE, cllose, rename, metadata, unlink", result.getInput());
assertEquals("The following are not valid event types: [cllose]", result.getExplanation());
assertFalse(result.isValid());
}
@Test
public void inputWithMultipleInvalidEventTypeShouldProperlyDisplayEventsInExplanation() throws Exception {
String subject = "subject";
String input = "append, CREATE, cllose, rename, metadata, unlink, unllink";
ValidationResult result = eventTypeValidator.validate(subject, input, context);
assertEquals("subject", result.getSubject());
assertEquals("append, CREATE, cllose, rename, metadata, unlink, unllink", result.getInput());
assertEquals("The following are not valid event types: [cllose, unllink]", result.getExplanation());
assertFalse(result.isValid());
}
}

View File

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processors.hadoop.inotify.util.EventTestUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestGetHDFSEvents {
NiFiProperties mockNiFiProperties;
KerberosProperties kerberosProperties;
DFSInotifyEventInputStream inotifyEventInputStream;
HdfsAdmin hdfsAdmin;
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = KerberosProperties.create(mockNiFiProperties);
inotifyEventInputStream = mock(DFSInotifyEventInputStream.class);
hdfsAdmin = mock(HdfsAdmin.class);
}
@Test
public void notSettingHdfsPathToWatchShouldThrowError() throws Exception {
exception.expect(AssertionError.class);
exception.expectMessage("'HDFS Path to Watch' is invalid because HDFS Path to Watch is required");
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.POLL_DURATION, "1 second");
runner.run();
}
@Test
public void onTriggerShouldProperlyHandleAnEmptyEventBatch() throws Exception {
EventBatch eventBatch = mock(EventBatch.class);
when(eventBatch.getEvents()).thenReturn(new Event[]{});
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(eventBatch);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
when(eventBatch.getTxid()).thenReturn(100L);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.POLL_DURATION, "1 second");
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path");
runner.setProperty(GetHDFSEvents.NUMBER_OF_RETRIES_FOR_POLL, "5");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(0, successfulFlowFiles.size());
verify(eventBatch).getTxid();
assertEquals("100", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void onTriggerShouldProperlyHandleANullEventBatch() throws Exception {
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(null);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.POLL_DURATION, "1 second");
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path${now()}");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(0, successfulFlowFiles.size());
assertEquals("-1", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void makeSureHappyPathForProcessingEventsSendsFlowFilesToCorrectRelationship() throws Exception {
Event[] events = getEvents();
EventBatch eventBatch = mock(EventBatch.class);
when(eventBatch.getEvents()).thenReturn(events);
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(eventBatch);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
when(eventBatch.getTxid()).thenReturn(100L);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.POLL_DURATION, "1 second");
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path(/)?.*");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(6, successfulFlowFiles.size());
verify(eventBatch).getTxid();
assertEquals("100", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void onTriggerShouldOnlyProcessEventsWithSpecificPath() throws Exception {
Event[] events = getEvents();
EventBatch eventBatch = mock(EventBatch.class);
when(eventBatch.getEvents()).thenReturn(events);
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(eventBatch);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
when(eventBatch.getTxid()).thenReturn(100L);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path/create(/)?");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(1, successfulFlowFiles.size());
verify(eventBatch).getTxid();
assertEquals("100", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void eventsProcessorShouldProperlyFilterEventTypes() throws Exception {
Event[] events = getEvents();
EventBatch eventBatch = mock(EventBatch.class);
when(eventBatch.getEvents()).thenReturn(events);
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(eventBatch);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
when(eventBatch.getTxid()).thenReturn(100L);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path(/.*)?");
runner.setProperty(GetHDFSEvents.EVENT_TYPES, "create, metadata, rename");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(3, successfulFlowFiles.size());
List<String> expectedEventTypes = Arrays.asList("CREATE", "METADATA", "RENAME");
for (MockFlowFile f : successfulFlowFiles) {
String eventType = f.getAttribute(EventAttributes.EVENT_TYPE);
assertTrue(expectedEventTypes.contains(eventType));
}
verify(eventBatch).getTxid();
assertEquals("100", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void makeSureExpressionLanguageIsWorkingProperlyWithinTheHdfsPathToWatch() throws Exception {
Event[] events = new Event[] {
new Event.AppendEvent("/some/path/1/2/3/t.txt"),
new Event.AppendEvent("/some/path/1/2/4/t.txt"),
new Event.AppendEvent("/some/path/1/2/3/.t.txt")
};
EventBatch eventBatch = mock(EventBatch.class);
when(eventBatch.getEvents()).thenReturn(events);
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(eventBatch);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
when(eventBatch.getTxid()).thenReturn(100L);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path/${literal(1)}/${literal(2)}/${literal(3)}/.*.txt");
runner.setProperty(GetHDFSEvents.EVENT_TYPES, "append");
runner.setProperty(GetHDFSEvents.IGNORE_HIDDEN_FILES, "true");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(1, successfulFlowFiles.size());
for (MockFlowFile f : successfulFlowFiles) {
String eventType = f.getAttribute(EventAttributes.EVENT_TYPE);
assertTrue(eventType.equals("APPEND"));
}
verify(eventBatch).getTxid();
assertEquals("100", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
private Event[] getEvents() {
return new Event[]{
EventTestUtils.createCreateEvent(),
EventTestUtils.createCloseEvent(),
EventTestUtils.createAppendEvent(),
EventTestUtils.createRenameEvent(),
EventTestUtils.createMetadataUpdateEvent(),
EventTestUtils.createUnlinkEvent()
};
}
private class TestableGetHDFSEvents extends GetHDFSEvents {
private final KerberosProperties testKerberosProperties;
private final FileSystem fileSystem = new DistributedFileSystem();
private final HdfsAdmin hdfsAdmin;
TestableGetHDFSEvents(KerberosProperties testKerberosProperties, HdfsAdmin hdfsAdmin) {
this.testKerberosProperties = testKerberosProperties;
this.hdfsAdmin = hdfsAdmin;
}
@Override
protected FileSystem getFileSystem() {
return fileSystem;
}
@Override
protected KerberosProperties getKerberosProperties() {
return testKerberosProperties;
}
@Override
protected HdfsAdmin getHdfsAdmin() {
return hdfsAdmin;
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.junit.Test;
import java.util.regex.Pattern;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestNotificationEventPathFilter {
@Test
public void acceptShouldProperlyReturnFalseWithNullPath() throws Exception {
assertFalse(new NotificationEventPathFilter(Pattern.compile(""), true).accept(null));
}
@Test
public void acceptPathShouldProperlyIgnorePathsWhereTheLastComponentStartsWithADot() throws Exception {
PathFilter filter = new NotificationEventPathFilter(Pattern.compile(".*"), true);
assertFalse(filter.accept(new Path("/.some_hidden_file")));
assertFalse(filter.accept(new Path("/some/long/path/.some_hidden_file/")));
}
@Test
public void acceptPathShouldProperlyAcceptPathsWhereTheNonLastComponentStartsWithADot() throws Exception {
PathFilter filter = new NotificationEventPathFilter(Pattern.compile(".*"), true);
assertTrue(filter.accept(new Path("/some/long/path/.some_hidden_file/should/work")));
assertTrue(filter.accept(new Path("/.some_hidden_file/should/still/accept")));
}
@Test
public void acceptPathShouldProperlyMatchAllSubdirectoriesThatMatchWatchDirectoryAndFileFilter() throws Exception {
PathFilter filter = new NotificationEventPathFilter(Pattern.compile("/root(/.*)?"), true);
assertTrue(filter.accept(new Path("/root/sometest.txt")));
}
@Test
public void acceptPathShouldProperlyMatchWhenWatchDirectoryMatchesPath() throws Exception {
PathFilter filter = new NotificationEventPathFilter(Pattern.compile("/root(/.*)?"), false);
assertTrue(filter.accept(new Path("/root")));
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.inotify.util;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event;
import java.util.Collections;
import java.util.Date;
public class EventTestUtils {
public static Event.CreateEvent createCreateEvent() {
return new Event.CreateEvent.Builder()
.ctime(new Date().getTime())
.groupName("group_name")
.iNodeType(Event.CreateEvent.INodeType.DIRECTORY)
.overwrite(false)
.ownerName("ownerName")
.path("/some/path/create")
.perms(new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE))
.replication(1)
.symlinkTarget("/some/symlink/target")
.build();
}
public static Event.CloseEvent createCloseEvent() {
return new Event.CloseEvent("/some/path/close", 1L, 2L);
}
public static Event.AppendEvent createAppendEvent() {
return new Event.AppendEvent("/some/path/append");
}
public static Event.RenameEvent createRenameEvent() {
return new Event.RenameEvent("/some/path/rename/src", "/some/path/rename/dest", 200L);
}
public static Event.MetadataUpdateEvent createMetadataUpdateEvent() {
return new Event.MetadataUpdateEvent.Builder()
.replication(0)
.perms(new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE))
.path("/some/path/metadata")
.ownerName("owner")
.acls(Collections.singletonList(new AclEntry.Builder().setName("schema").setPermission(FsAction.ALL).setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build()))
.atime(new Date().getTime())
.groupName("groupName")
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.mtime(1L)
.xAttrs(Collections.singletonList(new XAttr.Builder().setName("name").setNameSpace(XAttr.NameSpace.USER).setValue(new byte[0]).build()))
.xAttrsRemoved(false)
.build();
}
public static Event.UnlinkEvent createUnlinkEvent() {
return new Event.UnlinkEvent("/some/path/unlink", 300L);
}
}