NIFI-5922: First Commit for NiFi-Stateless

This commit is contained in:
Sam Hjelmfelt 2019-01-02 16:42:36 -08:00 committed by Mark Payne
parent 4d18eaa481
commit 417b3955d7
28 changed files with 5629 additions and 0 deletions

15
nifi-fn/.dockerignore Normal file
View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
.git/
.idea/

47
nifi-fn/Dockerfile Normal file
View File

@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
FROM openjdk:8-jre-alpine
LABEL maintainer="Apache NiFi <dev@nifi.apache.org>"
EXPOSE 8080
ENTRYPOINT ["/usr/bin/java", "-cp", "/usr/share/nifi-1.8.0/lib/*:/usr/share/nififn/lib/*:/usr/share/nififn/nififn.jar", "org.apache.nifi.fn.runtimes.Program"]
CMD ["RunOpenwhiskActionServer", "8080"]
# Add NiFi libraries
RUN wget -qO- https://www-us.apache.org/dist/nifi/1.8.0/nifi-1.8.0-bin.tar.gz | tar xvz -C /usr/share/
# Add Maven dependencies (not shaded into the artifact; Docker-cached)
ADD target/lib /usr/share/nififn/lib
# Add the service itself
ARG JAR_FILE
ADD target/${JAR_FILE} /usr/share/nififn/nififn.jar
#NiFi's HDFS processors require core-site.xml or hdfs-site.xml to exist on disk before they can be started...
RUN echo '<configuration> \n\
<property> \n\
<name>fs.defaultFS</name> \n\
<value>hdfs://localhost:8020</value> \n\
</property> \n\
<property> \n\
<name>fs.hdfs.impl</name> \n\
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value> \n\
</property> \n\
</configuration>' > /tmp/core-site.xml
RUN chmod 666 /tmp/core-site.xml

96
nifi-fn/README.md Normal file
View File

@ -0,0 +1,96 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# NiFi-Fn
### Build:
```mvn package```
docker image will be tagged nifi-fn:1.9.0-SNAPSHOT
### Usage:
```
1) RunFromRegistry [Once|Continuous] <NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]
RunFromRegistry [Once|Continuous] --json <JSON>
RunFromRegistry [Once|Continuous] --file <File Name>
2) RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> \
<NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --json <JSON>
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
3) RunOpenwhiskActionServer <Port>
```
### Examples:
```
1) RunFromRegistry Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \
"DestinationDirectory-/tmp/nififn/output2/" "" "absolute.path-/tmp/nififn/input/;filename-test.txt" "absolute.path-/tmp/nififn/input/;filename-test2.txt"
2) RunFromRegistry Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \
"DestinationDirectory-/tmp/nififn/output2/" "f25c9204-6c95-3aa9-b0a8-c556f5f61849" "absolute.path-/tmp/nififn/input/;filename-test.txt"
3) RunYARNServiceFromRegistry http://127.0.0.1:8088 nifi-fn:latest kafka-to-solr 3 --file kafka-to-solr.json
4) RunOpenwhiskActionServer 8080
```
###Notes:
```
1) <Input Variables> will be split on ';' and '-' then injected into the flow using the variable registry interface.
2) <Failure Output Ports> will be split on ';'. FlowFiles routed to matching output ports will immediately fail the flow.
3) <Input FlowFile> will be split on ';' and '-' then injected into the flow using the "nifi_content" field as the FlowFile content.
4) Multiple <Input FlowFile> arguments can be provided.
5) The configuration file must be in JSON format.
6) When providing configurations via JSON, the following attributes must be provided: nifi_registry, nifi_bucket, nifi_flow.
All other attributes will be passed to the flow using the variable registry interface
```
###JSON Sample:
```
{
"nifi_registry": "http://localhost:61080",
"nifi_bucket": "3aa885db-30c8-4c87-989c-d32b8ea1d3d8",
"nifi_flow": "0d219eb8-419b-42ba-a5ee-ce07445c6fc5",
"nifi_flowversion": -1,
"nifi_materializecontent":true,
"nifi_failureports": ["f25c9204-6c95-3aa9-b0a8-c556f5f61849"],
"nifi_flowfiles":[{
"absolute.path":"/tmp/nififn/input/",
"filename":"test.txt",
"nifi_content":"hello"
},
{
"absolute.path":"/tmp/nififn/input/",
"filename":"test2.txt",
"nifi_content":"hi"
}],
"DestinationDirectory":"/tmp/nififn/output2/"
}
```
### TODO:
* Provenance is always recorded instead of waiting for commit. Rollback could result in duplicates:
-FnProvenanceReporter.send force option is not appreciated
-NiFi-FnProcessSession.adjustCounter immediate is not appreciated
* Nar directory is hardcoded
reflectionUtil uses /usr/share/nifi-1.8.0/lib/ (location inside dockerfile)
* ####Classloader does not work
* Add support for:
process groups
funnels
* Send logs, metrics, and provenance to kafka/solr (configure a flow ID for each?)
* counters
* tests
* Processor and port IDs from the UI do not match IDs in templates or the registry

155
nifi-fn/pom.xml Normal file
View File

@ -0,0 +1,155 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-fn</artifactId>
<version>1.9.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.apache.nifi.fn.runtimes.Program</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<version>1.4.8</version>
<executions>
<execution>
<id>default</id>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
<configuration>
<repository>nifi-fn</repository>
<tag>${project.version}</tag>
<buildArgs>
<JAR_FILE>${project.build.finalName}.jar</JAR_FILE>
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-loading-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-data-provenance-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.registry.VariableRegistry;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FnConfigurationContext implements ConfigurationContext {
private final Map<PropertyDescriptor, String> properties;
private final ControllerServiceLookup serviceLookup;
private final ControllerService service;
private final VariableRegistry variableRegistry;
public FnConfigurationContext(final ControllerService service,
final Map<PropertyDescriptor, String> properties,
final ControllerServiceLookup serviceLookup,
final VariableRegistry variableRegistry) {
this.service = service;
this.properties = properties;
this.serviceLookup = serviceLookup;
this.variableRegistry = variableRegistry;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
String value = properties.get(property);
if (value == null) {
value = getActualDescriptor(property).getDefaultValue();
}
return new FnPropertyValue(value, serviceLookup, variableRegistry);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return new HashMap<>(this.properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) {
if (service == null) {
return property;
}
final PropertyDescriptor resolved = service.getPropertyDescriptor(property.getName());
return resolved == null ? property : resolved;
}
@Override
public String getSchedulingPeriod() {
return "0 secs";
}
@Override
public Long getSchedulingPeriod(final TimeUnit timeUnit) {
return 0L;
}
@Override
public String getName() {
return null;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class FnControllerServiceConfiguration {
private final ControllerService service;
private final AtomicBoolean enabled = new AtomicBoolean(false);
private String annotationData;
private Map<PropertyDescriptor, String> properties = new HashMap<>();
public FnControllerServiceConfiguration(final ControllerService service) {
this.service = service;
}
public ControllerService getService() {
return service;
}
public void setEnabled(final boolean enabled) {
this.enabled.set(enabled);
}
public boolean isEnabled() {
return this.enabled.get();
}
public void setProperties(final Map<PropertyDescriptor, String> props) {
this.properties = new HashMap<>(props);
}
public void setProperty(final PropertyDescriptor key, final String value) { this.properties.put(key,value); }
public String getProperty(final PropertyDescriptor descriptor) {
final String value = properties.get(descriptor);
if (value == null) {
return descriptor.getDefaultValue();
} else {
return value;
}
}
public void setAnnotationData(final String annotationData) {
this.annotationData = annotationData;
}
public String getAnnotationData() {
return annotationData;
}
public Map<PropertyDescriptor, String> getProperties() {
return Collections.unmodifiableMap(properties);
}
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.reporting.InitializationException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static java.util.Objects.requireNonNull;
public class FnControllerServiceLookup implements ControllerServiceLookup {
private final Map<String, FnControllerServiceConfiguration> controllerServiceMap = new ConcurrentHashMap<>();
private final Map<String, SLF4JComponentLog> controllerServiceLoggers = new HashMap<>();
private final Map<String, FnStateManager> controllerServiceStateManagers = new HashMap<>();
public Map<String, FnControllerServiceConfiguration> getControllerServices() {
return controllerServiceMap;
}
public void addControllerService(final VersionedControllerService versionedControllerService) throws InitializationException {
String id = versionedControllerService.getIdentifier();
ControllerService service = ReflectionUtils.createControllerService(versionedControllerService);
Map<String, String> properties = versionedControllerService.getProperties();
addControllerService(id,service,properties);
}
public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
final SLF4JComponentLog logger = new SLF4JComponentLog(service);
controllerServiceLoggers.put(identifier, logger);
FnStateManager serviceStateManager = new FnStateManager();
controllerServiceStateManagers.put(identifier, serviceStateManager);
final FnProcessContext initContext = new FnProcessContext(requireNonNull(service), this, requireNonNull(identifier), logger, serviceStateManager);
service.initialize(initContext);
final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), entry.getValue());
}
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, service);
} catch (final InvocationTargetException | IllegalAccessException | IllegalArgumentException e) {
throw new InitializationException(e);
}
final FnControllerServiceConfiguration config = new FnControllerServiceConfiguration(service);
controllerServiceMap.put(identifier, config);
}
public void removeControllerService(final ControllerService service) throws InvocationTargetException, IllegalAccessException {
final ControllerService canonical = getControllerService(service.getIdentifier());
disableControllerService(canonical);
ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, canonical);
if (canonical == null || canonical != service) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
controllerServiceMap.remove(service.getIdentifier());
}
protected FnControllerServiceConfiguration getConfiguration(final String identifier) {
return controllerServiceMap.get(identifier);
}
@Override
public ControllerService getControllerService(final String identifier) {
final FnControllerServiceConfiguration status = controllerServiceMap.get(identifier);
return (status == null) ? null : status.getService();
}
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final FnControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier);
if (status == null) {
throw new IllegalArgumentException("No ControllerService exists with identifier " + serviceIdentifier);
}
return status.isEnabled();
}
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
return isControllerServiceEnabled(service.getIdentifier());
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return false;
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
final Set<String> ids = new HashSet<>();
for (final Map.Entry<String, FnControllerServiceConfiguration> entry : controllerServiceMap.entrySet()) {
if (serviceType.isAssignableFrom(entry.getValue().getService().getClass())) {
ids.add(entry.getKey());
}
}
return ids;
}
@Override
public String getControllerServiceName(final String serviceIdentifier) {
final FnControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier);
return status == null ? null : serviceIdentifier;
}
public void disableControllerService(final ControllerService service) throws InvocationTargetException, IllegalAccessException {
final FnControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier());
if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
if (!configuration.isEnabled()) {
throw new IllegalStateException("Controller service " + service + " cannot be disabled because it is not enabled");
}
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service);
configuration.setEnabled(false);
}
public void enableControllerService(final ControllerService service, VariableRegistry registry) throws InvocationTargetException, IllegalAccessException {
final FnControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier());
if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
if (configuration.isEnabled()) {
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled");
}
final ConfigurationContext configContext = new FnConfigurationContext(service, configuration.getProperties(), this, registry);
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext);
configuration.setEnabled(true);
}
public SLF4JComponentLog getControllerServiceLogger(final String identifier) {
return controllerServiceLoggers.get(identifier);
}
Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
return this.getConfiguration(controllerService.getIdentifier()).getProperties();
}
String getControllerServiceAnnotationData(final ControllerService controllerService) {
return this.getConfiguration(controllerService.getIdentifier()).getAnnotationData();
}
public FnStateManager getStateManager(final ControllerService controllerService) {
return controllerServiceStateManagers.get(controllerService.getIdentifier());
}
public void setControllerServiceAnnotationData(final ControllerService service, final String annotationData) {
final FnControllerServiceConfiguration configuration = getControllerServiceConfigToUpdate(service);
configuration.setAnnotationData(annotationData);
}
private FnControllerServiceConfiguration getControllerServiceConfigToUpdate(final ControllerService service) {
final FnControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier());
if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
if (configuration.isEnabled()) {
throw new IllegalStateException("Controller service " + service + " cannot be modified because it is not disabled");
}
return configuration;
}
public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final FnProcessContext context, final VariableRegistry registry, final String value) {
final FnStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier());
if (serviceStateManager == null) {
throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method");
}
final ValidationContext validationContext = new FnValidationContext(context, this, serviceStateManager, registry).getControllerServiceValidationContext(service);
final ValidationResult validationResult = property.validate(value, validationContext);
final FnControllerServiceConfiguration configuration = getControllerServiceConfigToUpdate(service);
final String oldValue = configuration.getProperties().get(property);
configuration.setProperty(property,value);
if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
service.onPropertyModified(property, oldValue, value);
}
return validationResult;
}
}

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.nar.*;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.*;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NiFiProperties;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.stream.Collectors;
public class FnFlow {
public static final String REGISTRY = "nifi_registry";
public static final String BUCKETID = "nifi_bucket";
public static final String FLOWID = "nifi_flow";
public static final String FLOWVERSION = "nifi_flowversion";
public static final String MATERIALIZECONTENT = "nifi_materializecontent";
public static final String FAILUREPORTS = "nifi_failureports";
public static final String FLOWFILES = "nifi_flowfiles";
public static final String CONTENT = "nifi_content";
public static final List<String> reservedFields = Arrays.asList(REGISTRY,BUCKETID,FLOWID,FLOWVERSION,FAILUREPORTS,MATERIALIZECONTENT,FLOWFILES);
private List<FnProcessorWrapper> roots;
private volatile boolean stopRequested = false;
private FnProcessorWrapper inputProcessor = null;
public FnFlow(String registryUrl, String bucketID, String flowID, int versionID, VariableRegistry variableRegistry, List<String> failureOutputPorts, boolean materializeContent) throws IOException, NiFiRegistryException, IllegalAccessException, ProcessorInstantiationException, InvocationTargetException, InitializationException {
this(
new RegistryUtil(registryUrl).getFlowByID(bucketID, flowID, versionID),
variableRegistry,
failureOutputPorts,
materializeContent
);
}
public FnFlow(String registryUrl, String bucketID, String flowID, VariableRegistry variableRegistry, List<String> failureOutputPorts, boolean materializeContent) throws IOException, NiFiRegistryException, IllegalAccessException, ProcessorInstantiationException, InvocationTargetException, InitializationException {
this(
new RegistryUtil(registryUrl).getFlowByID(bucketID, flowID),
variableRegistry,
failureOutputPorts,
materializeContent
);
}
public FnFlow(VersionedFlowSnapshot flowSnapshot, VariableRegistry variableRegistry, List<String> failureOutputPorts, boolean materializeContent) throws IllegalAccessException, ProcessorInstantiationException, InvocationTargetException, InitializationException {
VersionedProcessGroup contents = flowSnapshot.getFlowContents();
Set<VersionedProcessor> processors = contents.getProcessors();
Set<VersionedConnection> connections = contents.getConnections();
Set<VersionedPort> inputPorts = contents.getInputPorts();
Set<VersionedFunnel> funnels = contents.getFunnels();
if(inputPorts.size() > 1)
throw new IllegalArgumentException("Only one input port per flow is allowed");
FnControllerServiceLookup serviceLookup = new FnControllerServiceLookup();
Set<VersionedControllerService> controllerServices = contents.getControllerServices();
for(VersionedControllerService service : controllerServices){
serviceLookup.addControllerService(service);
}
Map<String, FnProcessorWrapper> pwMap = new HashMap<>();
for(VersionedConnection connection : connections) {
boolean isInputPortConnection = false;
FnProcessorWrapper sourcePw = null;
ConnectableComponent source = connection.getSource();
switch (source.getType()){
case PROCESSOR:
if(pwMap.containsKey(source.getId())) {
sourcePw = pwMap.get(source.getId());
} else {
Optional<VersionedProcessor> processor = processors.stream().filter(p -> source.getId().equals(p.getIdentifier())).findFirst();
if (processor.isPresent()) {
sourcePw = new FnProcessorWrapper(processor.get(), null, serviceLookup, variableRegistry, materializeContent);
pwMap.put(source.getId(), sourcePw);
} else {
throw new IllegalArgumentException("Unknown input processor.. "+source.getId());
}
}
break;
case REMOTE_INPUT_PORT:
isInputPortConnection = true;
break;
case REMOTE_OUTPUT_PORT:
throw new IllegalArgumentException("Unsupported source type: "+source.getType());
case INPUT_PORT:
isInputPortConnection = true;
break;
case OUTPUT_PORT:
throw new IllegalArgumentException("Unsupported source type: "+source.getType());
case FUNNEL:
throw new IllegalArgumentException("Unsupported source type: "+source.getType());
}
ConnectableComponent destination = connection.getDestination();
FnProcessorWrapper destinationPw;
switch (destination.getType()) {
case PROCESSOR:
if (pwMap.containsKey(destination.getId())) {
destinationPw = pwMap.get(destination.getId());
} else {
Optional<VersionedProcessor> processor = processors.stream().filter(p -> destination.getId().equals(p.getIdentifier())).findFirst();
if (!processor.isPresent())
return;
destinationPw = new FnProcessorWrapper(processor.get(), sourcePw, serviceLookup, variableRegistry, materializeContent);
pwMap.put(destination.getId(), destinationPw);
}
destinationPw.incomingConnections.add(connection.getIdentifier());
if(isInputPortConnection){
inputProcessor = destinationPw;
} else {
destinationPw.parents.add(sourcePw);
//Link source and destination
for (String relationship : connection.getSelectedRelationships()) {
sourcePw.addChild(destinationPw, new Relationship.Builder().name(relationship).build());
}
}
break;
case INPUT_PORT:
throw new IllegalArgumentException("Unsupported destination type: "+destination.getType());
case REMOTE_INPUT_PORT:
throw new IllegalArgumentException("Unsupported destination type: "+destination.getType());
case REMOTE_OUTPUT_PORT:
case OUTPUT_PORT:
if(isInputPortConnection)
throw new IllegalArgumentException("Input ports can not be mapped directly to output ports...");
//Link source and destination
for (String relationship : connection.getSelectedRelationships()) {
sourcePw.addOutputPort(
new Relationship.Builder().name(relationship).build(),
failureOutputPorts.contains(destination.getId())
);
}
break;
case FUNNEL:
throw new IllegalArgumentException("Unsupported destination type: "+destination.getType());
}
}
roots = pwMap.entrySet()
.stream()
.filter(e->e.getValue().parents.isEmpty())
.map(e->e.getValue())
.collect(Collectors.toList());
}
public FnFlow(FnProcessorWrapper root){
this(Collections.singletonList(root));
}
public FnFlow(List<FnProcessorWrapper> roots){
this.roots = roots;
}
public boolean run(Queue<FnFlowFile> output){
while(!this.stopRequested){
for (FnProcessorWrapper pw : roots){
boolean successful = pw.runRecursive(output);
if(!successful)
return false;
}
}
return true;
}
public boolean runOnce(Queue<FnFlowFile> output){
for (FnProcessorWrapper pw : roots){
boolean successful = pw.runRecursive(output);
if(!successful)
return false;
}
return true;
}
public void shutdown(){
this.stopRequested = true;
this.roots.forEach(r->r.shutdown());
}
public static FnFlow createAndEnqueueFromJSON(JsonObject args) throws IllegalAccessException, InvocationTargetException, InitializationException, IOException, ProcessorInstantiationException, NiFiRegistryException {
if(args == null)
throw new IllegalArgumentException("Flow arguments can not be null");
System.out.println("Running flow from json: "+args.toString());
if(!args.has(REGISTRY) || !args.has(BUCKETID) || !args.has(FLOWID))
throw new IllegalArgumentException("The following parameters must be provided: "+REGISTRY+", "+BUCKETID+", "+FLOWID);
String registryurl = args.getAsJsonPrimitive(REGISTRY).getAsString();
String bucketID = args.getAsJsonPrimitive(BUCKETID).getAsString();
String flowID = args.getAsJsonPrimitive(FLOWID).getAsString();
int flowVersion = -1;
if(args.has(FLOWVERSION))
flowVersion = args.getAsJsonPrimitive(FLOWVERSION).getAsInt();
boolean materializeContent = true;
if(args.has(MATERIALIZECONTENT))
materializeContent = args.getAsJsonPrimitive(MATERIALIZECONTENT).getAsBoolean();
List<String> failurePorts = new ArrayList<>();
if(args.has(FAILUREPORTS))
args.getAsJsonArray(FAILUREPORTS).forEach(port->
failurePorts.add(port.getAsString())
);
Map<VariableDescriptor,String> inputVariables = new HashMap<>();
args.entrySet().forEach(entry ->{
if(!reservedFields.contains(entry.getKey()))
inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString());
});
FnFlow flow = new FnFlow(registryurl,bucketID,flowID,flowVersion,()->inputVariables, failurePorts, materializeContent);
flow.enqueueFromJSON(args);
return flow;
}
public void enqueueFlowFile(byte[] content, Map<String,String> attributes){
if(inputProcessor == null)
throw new IllegalArgumentException("Flow does not have an input port...");
//enqueue data
Queue<FnFlowFile> input = new LinkedList<>();
input.add(new FnFlowFile(content,attributes,inputProcessor.materializeContent));
inputProcessor.enqueueAll(input);
}
public void enqueueFromJSON(JsonObject json){
if(inputProcessor == null)
throw new IllegalArgumentException("Flow does not have an input port...");
Queue<FnFlowFile> input = new LinkedList<>();
JsonArray flowFiles = json.getAsJsonArray(FLOWFILES);
flowFiles.forEach(f->{
JsonObject file = f.getAsJsonObject();
String content = file.getAsJsonPrimitive(CONTENT).getAsString();
Map<String,String> attributes = new HashMap<>();
file.entrySet().forEach(entry ->{
if(!CONTENT.equals(entry.getKey()))
attributes.put(entry.getKey(), entry.getValue().getAsString());
});
input.add(new FnFlowFile(content,attributes,inputProcessor.materializeContent));
});
//enqueue data
inputProcessor.enqueueAll(input);
}
}

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import com.google.gson.JsonObject;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class FnFlowFile implements FlowFileRecord {
private final Map<String, String> attributes = new HashMap<>();
private static AtomicLong nextID = new AtomicLong(0);
public final boolean materializeContent;
private long id;
private final long entryDate;
private final long creationTime;
private boolean penalized = false;
private List<InputStream> dataStreams = new ArrayList<>();
private byte[] data = new byte[0];
private boolean isFullyMaterialized = true;
private long lastEnqueuedDate = 0;
private long enqueuedIndex = 0;
public FnFlowFile(String content, Map<String,String> attributes, boolean materializeContent){
this(content.getBytes(StandardCharsets.UTF_8), attributes,materializeContent);
}
public FnFlowFile(byte[] content, Map<String,String> attributes, boolean materializeContent){
this(materializeContent);
this.attributes.putAll(attributes);
this.setData(content);
}
public FnFlowFile(final FnFlowFile toCopy, boolean materializeContent) {
this(materializeContent);
this.id = toCopy.id;
attributes.putAll(toCopy.getAttributes());
this.penalized = toCopy.isPenalized();
try {
this.setData(toCopy.getDataArray());
} catch (IOException e) {
throw new FlowFileAccessException("Exception creating FlowFile",e);
}
}
public FnFlowFile(final FnFlowFile toCopy, long offset, long size, boolean materializeContent) {
this(materializeContent);
this.id = toCopy.id;
attributes.putAll(toCopy.getAttributes());
this.penalized = toCopy.isPenalized();
try {
this.setData(Arrays.copyOfRange(toCopy.getDataArray(), (int) offset, (int) (offset + size)));
} catch (IOException e) {
throw new FlowFileAccessException("Exception creating FlowFile",e);
}
}
public FnFlowFile(boolean materializeContent) {
this.materializeContent = materializeContent;
this.creationTime = System.nanoTime();
this.id = nextID.getAndIncrement();
this.entryDate = System.currentTimeMillis();
this.lastEnqueuedDate = entryDate;
attributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()) + ".fnFlowFile");
attributes.put(CoreAttributes.PATH.key(), "target");
attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
}
//region SimpleMethods
void setPenalized(boolean penalized) {
this.penalized = penalized;
}
public long getCreationTime() {
return creationTime;
}
@Override
public long getLineageStartDate() {
return entryDate;
}
@Override
public int compareTo(final FlowFile o) {
return getAttribute(CoreAttributes.UUID.key()).compareTo(o.getAttribute(CoreAttributes.UUID.key()));
}
@Override
public String getAttribute(final String attrName) {
return attributes.get(attrName);
}
@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
@Override
public long getEntryDate() {
return entryDate;
}
@Override
public long getId() {
return id;
}
@Override
public boolean isPenalized() {
return penalized;
}
public void putAttributes(final Map<String, String> attrs) {
attributes.putAll(attrs);
}
public void removeAttributes(final Set<String> attrNames) {
for (final String attrName : attrNames) {
attributes.remove(attrName);
}
}
@Override
public int hashCode() {
return Objects.hashCode(id);
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj instanceof FlowFile) {
return ((FlowFile) obj).getId() == this.id;
}
return false;
}
@Override
public Long getLastQueueDate() {
return lastEnqueuedDate;
}
public void setLastEnqueuedDate(long lastEnqueuedDate) {
this.lastEnqueuedDate = lastEnqueuedDate;
}
@Override
public long getPenaltyExpirationMillis() {
return -1;
}
@Override
public ContentClaim getContentClaim() {
return null;
}
@Override
public long getContentClaimOffset() {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return enqueuedIndex;
}
public void setEnqueuedIndex(long enqueuedIndex) {
this.enqueuedIndex = enqueuedIndex;
}
//endregion Methods
@Override
public String toString() {
JsonObject attributes = new JsonObject();
this.attributes.forEach(attributes::addProperty);
JsonObject result = new JsonObject();
result.add("attributes", attributes);
return result.toString();
}
public String toStringFull() {
JsonObject attributes = new JsonObject();
this.attributes.forEach(attributes::addProperty);
JsonObject result = new JsonObject();
result.add("attributes", attributes);
try {
result.addProperty("content", new String(this.getDataArray(),StandardCharsets.UTF_8));
} catch (IOException e) {
result.addProperty("content","Exception getting content: "+e.getMessage());
}
return result.toString();
}
@Override
public long getSize() {
if(isFullyMaterialized)
return data.length;
else
return 0;
}
public void addData(final byte[] data) {
if(materializeContent){
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );
try {
outputStream.write(this.data);
outputStream.write(data);
} catch (IOException e) {
e.printStackTrace();
}
this.data = outputStream.toByteArray();
isFullyMaterialized = true;
} else {
isFullyMaterialized = false;
this.dataStreams.add(new ByteArrayInputStream(data));
}
}
public void addData(final InputStream in) {
if(materializeContent) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );
try {
outputStream.write(this.data);
outputStream.write(in);
} catch (IOException e) {
e.printStackTrace();
}
this.data = outputStream.toByteArray();
isFullyMaterialized = true;
} else {
isFullyMaterialized = false;
this.dataStreams.add(in);
}
}
public void setData(final byte[] data) {
this.data = data;
isFullyMaterialized = true;
}
public void setData(final InputStream in) {
if(materializeContent) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );
try {
outputStream.write(in);
} catch (IOException e) {
e.printStackTrace();
}
this.data = outputStream.toByteArray();
isFullyMaterialized = true;
} else {
isFullyMaterialized = false;
this.dataStreams = new ArrayList<>();
this.dataStreams.add(in);
}
}
public InputStream getDataStream() {
if(isFullyMaterialized) {
return new ByteArrayInputStream(this.data);
} else {
return new SequenceInputStream(
new ByteArrayInputStream(this.data),
new SequenceInputStream(Collections.enumeration(this.dataStreams))
);
}
}
public byte[] getDataArray() throws IOException {
if(!isFullyMaterialized) {
materializeData();
}
return this.data;
}
public void materializeData() throws IOException {
InputStream in = this.getDataStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024*1024];
int read = 0;
while ((read = in.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
baos.flush();
this.data = baos.toByteArray();
this.dataStreams = new ArrayList<>();
isFullyMaterialized = true;
}
}

View File

@ -0,0 +1,420 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.*;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.registry.VariableRegistry;
import java.io.File;
import java.util.*;
public class FnProcessContext implements SchedulingContext, ControllerServiceInitializationContext {
private final ConfigurableComponent component;
private final String componentName;
private final Map<PropertyDescriptor, String> properties = new HashMap<>();
private final StateManager stateManager;
private final VariableRegistry variableRegistry;
private String annotationData = null;
private boolean yieldCalled = false;
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
private volatile boolean incomingConnection = true;
private volatile boolean nonLoopConnection = true;
private volatile InputRequirement inputRequirement = null;
private int maxConcurrentTasks = 1;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
private final String identifier;
private final SLF4JComponentLog logger;
private final FnControllerServiceLookup lookup;
public FnProcessContext(final ConfigurableComponent component, final FnControllerServiceLookup lookup, final String componentName, final StateManager stateManager, final VariableRegistry variableRegistry) {
this(component, lookup, componentName, new SLF4JComponentLog(component), stateManager, variableRegistry);
}
public FnProcessContext(final ConfigurableComponent component, final FnControllerServiceLookup lookup, final String componentName, final SLF4JComponentLog logger, final FnStateManager statemanager) {
this(component, lookup, componentName, logger, statemanager, VariableRegistry.EMPTY_REGISTRY);
}
public FnProcessContext(final ConfigurableComponent component,
final FnControllerServiceLookup lookup,
final String componentName,
final SLF4JComponentLog logger,
final StateManager stateManager,
final VariableRegistry variableRegistry) {
this.component = Objects.requireNonNull(component);
this.componentName = componentName == null ? "" : componentName;
this.inputRequirement = component.getClass().getAnnotation(InputRequirement.class);
this.lookup = lookup;
this.stateManager = stateManager;
this.variableRegistry = variableRegistry;
this.identifier = "ProcessContext-"+this.hashCode();
this.logger = logger;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return getProperty(descriptor.getName());
}
@Override
public PropertyValue getProperty(final String propertyName) {
final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
if (descriptor == null) {
return null;
}
final String setPropertyValue = properties.get(descriptor);
final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
return new FnPropertyValue(propValue, this.lookup, variableRegistry, (enableExpressionValidation && allowExpressionValidation) ? descriptor : null);
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
return new FnPropertyValue(rawValue, this.lookup, variableRegistry);
}
public ValidationResult setProperty(final String propertyName, final String propertyValue) {
return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue);
}
public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
if(descriptor == null)
throw new IllegalArgumentException("descriptor can not be null");
if(value == null)
throw new IllegalArgumentException("Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
final ValidationResult result = fullyPopulatedDescriptor.validate(value, new FnValidationContext(this, lookup, stateManager, variableRegistry));
String oldValue = properties.put(fullyPopulatedDescriptor, value);
if (oldValue == null) {
oldValue = fullyPopulatedDescriptor.getDefaultValue();
}
if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value);
}
return result;
}
public boolean removeProperty(final PropertyDescriptor descriptor) {
Objects.requireNonNull(descriptor);
return removeProperty(descriptor.getName());
}
public boolean removeProperty(final String property) {
Objects.requireNonNull(property);
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(property);
String value = null;
if ((value = properties.remove(fullyPopulatedDescriptor)) != null) {
if (!value.equals(fullyPopulatedDescriptor.getDefaultValue())) {
component.onPropertyModified(fullyPopulatedDescriptor, value, null);
}
return true;
}
return false;
}
@Override
public void yield() {
yieldCalled = true;
}
public boolean isYieldCalled() {
return yieldCalled;
}
@Override
public int getMaxConcurrentTasks() {
return maxConcurrentTasks;
}
public void setAnnotationData(final String annotationData) {
this.annotationData = annotationData;
}
@Override
public String getAnnotationData() {
return annotationData;
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
} else {
final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
for (final PropertyDescriptor descriptor : supported) {
props.put(descriptor, null);
}
props.putAll(properties);
return props;
}
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
public Collection<ValidationResult> validate() {
final List<ValidationResult> results = new ArrayList<>();
final ValidationContext validationContext = new FnValidationContext(this, lookup, stateManager, variableRegistry);
final Collection<ValidationResult> componentResults = component.validate(validationContext);
results.addAll(componentResults);
final Collection<ValidationResult> serviceResults = validateReferencedControllerServices(validationContext);
results.addAll(serviceResults);
// verify all controller services are enabled
for (Map.Entry<String, FnControllerServiceConfiguration> service : this.lookup.getControllerServices().entrySet()) {
if (!service.getValue().isEnabled()) {
results.add(new ValidationResult.Builder()
.explanation("Controller service " + service.getKey() + " for " + this.getName() + " is not enabled")
.valid(false)
.build());
}
}
return results;
}
protected final Collection<ValidationResult> validateReferencedControllerServices(final ValidationContext validationContext) {
final List<PropertyDescriptor> supportedDescriptors = component.getPropertyDescriptors();
if (supportedDescriptors == null) {
return Collections.emptyList();
}
final Collection<ValidationResult> validationResults = new ArrayList<>();
for (final PropertyDescriptor descriptor : supportedDescriptors) {
if (descriptor.getControllerServiceDefinition() == null) {
// skip properties that aren't for a controller service
continue;
}
final String controllerServiceId = validationContext.getProperty(descriptor).getValue();
if (controllerServiceId == null) {
continue;
}
final ControllerService controllerService = this.lookup.getControllerService(controllerServiceId);
if (controllerService == null) {
final ValidationResult result = new ValidationResult.Builder()
.valid(false)
.subject(descriptor.getDisplayName())
.input(controllerServiceId)
.explanation("Invalid Controller Service: " + controllerServiceId + " is not a valid Controller Service Identifier")
.build();
validationResults.add(result);
continue;
}
final Class<? extends ControllerService> requiredServiceClass = descriptor.getControllerServiceDefinition();
if (!requiredServiceClass.isAssignableFrom(controllerService.getClass())) {
final ValidationResult result = new ValidationResult.Builder()
.valid(false)
.subject(descriptor.getDisplayName())
.input(controllerServiceId)
.explanation("Invalid Controller Service: " + controllerServiceId + " does not implement interface " + requiredServiceClass)
.build();
validationResults.add(result);
continue;
}
final boolean enabled = this.lookup.isControllerServiceEnabled(controllerServiceId);
if (!enabled) {
validationResults.add(new ValidationResult.Builder()
.input(controllerServiceId)
.subject(descriptor.getDisplayName())
.explanation("Controller Service with ID " + controllerServiceId + " is not enabled")
.valid(false)
.build());
}
}
return validationResults;
}
public boolean isValid() {
int failureCount = 0;
for (final ValidationResult result : validate()) {
if (!result.isValid()) {
logger.error(result.toString());
failureCount++;
}
}
return failureCount == 0;
}
@Override
public String encrypt(final String unencrypted) {
return "enc{" + unencrypted + "}";
}
@Override
public String decrypt(final String encrypted) {
if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
return encrypted.substring(4, encrypted.length() - 2);
}
return encrypted;
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return this.lookup;
}
@Override
public void leaseControllerService(final String identifier) {
}
@Override
public Set<Relationship> getAvailableRelationships() {
if (!(component instanceof Processor)) {
return Collections.emptySet();
}
final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
relationships.removeAll(unavailableRelationships);
return relationships;
}
public void setUnavailableRelationships(final Set<Relationship> relationships) {
this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
}
public Set<Relationship> getUnavailableRelationships() {
return unavailableRelationships;
}
@Override
public boolean hasIncomingConnection() {
return incomingConnection;
}
public void setIncomingConnection(final boolean hasIncomingConnection) {
this.incomingConnection = hasIncomingConnection;
}
@Override
public boolean hasConnection(Relationship relationship) {
return this.connections.contains(relationship);
}
public void setNonLoopConnection(final boolean hasNonLoopConnection) {
this.nonLoopConnection = hasNonLoopConnection;
}
@Override
public boolean hasNonLoopConnection() {
return nonLoopConnection;
}
public void addConnection(final Relationship relationship) {
this.connections.add(relationship);
}
public void removeConnection(final Relationship relationship) {
this.connections.remove(relationship);
}
public void setConnections(final Set<Relationship> connections) {
if (connections == null) {
this.connections = Collections.emptySet();
} else {
this.connections = Collections.unmodifiableSet(connections);
}
}
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Query.Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
@Override
public StateManager getStateManager() {
return stateManager;
}
@Override
public String getName() {
return componentName;
}
protected void setMaxConcurrentTasks(int maxConcurrentTasks) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
@Override
public String getIdentifier() {
return identifier;
}
@Override
public ComponentLog getLogger() {
return logger;
}
@Override
public String getKerberosServicePrincipal() {
return null; //this needs to be wired in.
}
@Override
public File getKerberosServiceKeytab() {
return null; //this needs to be wired in.
}
@Override
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import java.io.File;
import java.util.UUID;
public class FnProcessorInitializationContext implements ProcessorInitializationContext {
private final ComponentLog logger;
private final String processorId;
private final ProcessContext context;
public FnProcessorInitializationContext(final Processor processor, final ProcessContext context) {
processorId = UUID.randomUUID().toString();
logger = new SLF4JComponentLog(processor);
this.context = context;
}
public String getIdentifier() {
return processorId;
}
public ComponentLog getLogger() {
return logger;
}
public ControllerServiceLookup getControllerServiceLookup() {
return context.getControllerServiceLookup();
}
public NodeTypeProvider getNodeTypeProvider() {
return new NodeTypeProvider() {
public boolean isClustered() {
return false;
}
public boolean isPrimary() {
return false;
}
};
}
public String getKerberosServicePrincipal() {
return null; //this needs to be wired in.
}
public File getKerberosServiceKeytab() {
return null; //this needs to be wired in.
}
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}
}

View File

@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.annotation.lifecycle.*;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedProcessor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class FnProcessorWrapper {
private final long runSchedule = 100;
public List<FnProcessorWrapper> parents;
public List<String> incomingConnections;
public final Map<Relationship, ArrayList<FnProcessorWrapper>> children;
private final Set<Relationship> autoTermination;
private final Set<Relationship> successOutputPorts;
private final Set<Relationship> failureOutputPorts;
public boolean materializeContent;
private final Processor processor;
private final FnProcessContext context;
private final Queue<FnFlowFile> inputQueue;
private final VariableRegistry variableRegistry;
private final Collection<ProvenanceEventRecord> provenanceEvents;
private final Set<FnProcessSession> createdSessions;
private final ComponentLog logger;
private final FnControllerServiceLookup lookup;
private volatile boolean stopRequested = false;
private volatile boolean isStopped = true;
private volatile boolean initialized = false;
FnProcessorWrapper(final VersionedProcessor processor, final FnProcessorWrapper parent, FnControllerServiceLookup lookup, VariableRegistry registry, boolean materializeContent) throws InvocationTargetException, IllegalAccessException, ProcessorInstantiationException {
this(ReflectionUtils.createProcessor(processor),parent, lookup, registry,materializeContent);
for(String relationship : processor.getAutoTerminatedRelationships()) {
this.addAutoTermination(new Relationship.Builder().name(relationship).build());
}
processor.getProperties().forEach((key, value) -> this.setProperty(key,value));
}
FnProcessorWrapper(final Processor processor, final FnProcessorWrapper parent, FnControllerServiceLookup lookup, VariableRegistry registry, boolean materializeContent) throws InvocationTargetException, IllegalAccessException {
this.processor = processor;
this.parents = new ArrayList<>();
if(parent != null)
this.parents.add(parent);
this.lookup = lookup;
this.materializeContent = materializeContent;
this.incomingConnections = new ArrayList<>();
this.children = new HashMap<>();
this.autoTermination = new HashSet<>();
this.successOutputPorts = new HashSet<>();
this.failureOutputPorts = new HashSet<>();
this.provenanceEvents = new ArrayList<>();
this.createdSessions = new CopyOnWriteArraySet<>();
this.inputQueue = new LinkedList<>();
this.variableRegistry = registry;
this.context = new FnProcessContext(processor, lookup, processor.getIdentifier(), new FnStateManager(), variableRegistry);
this.context.setMaxConcurrentTasks(1);
final FnProcessorInitializationContext initContext = new FnProcessorInitializationContext(processor, context);
processor.initialize(initContext);
logger = initContext.getLogger();
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
public Processor getProcessor(){return this.processor;}
private void initialize(){
//Validate context
Collection<ValidationResult> validationResult = context.validate();
if(validationResult.stream().anyMatch(a->!a.isValid()) || !this.validate()) {
throw new IllegalArgumentException(
"context is not valid: "+
String.join("\n",validationResult.stream().map(r->r.toString()).collect(Collectors.toList())));
}
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("Exception: ",e);
}
initialized = true;
}
public boolean runRecursive(Queue<FnFlowFile> output) {
if(!initialized)
initialize();
AtomicBoolean processingSuccess = new AtomicBoolean(true);
Set<Relationship> outputRelationships = new HashSet<>(children.keySet());
outputRelationships.addAll(successOutputPorts);
outputRelationships.addAll(failureOutputPorts);
do {
this.isStopped = false;
AtomicBoolean nextStepCalled = new AtomicBoolean(false);
try {
logger.info("Running "+this.processor.getClass().getSimpleName()+".onTrigger with "+inputQueue.size()+" records");
processor.onTrigger(context, () -> {
final FnProcessSession session = new FnProcessSession(
inputQueue,
provenanceEvents,
processor,
outputRelationships,
materializeContent,
() -> {
if(!nextStepCalled.get()) {
nextStepCalled.set(true);
boolean successfulRun = runChildren(output);
processingSuccess.set(successfulRun);
}
});
createdSessions.add(session);
return session;
});
if(!nextStepCalled.get()) {
nextStepCalled.set(true);
boolean successfulRun = runChildren(output);
processingSuccess.set(successfulRun);
}
provenanceEvents.clear();
Thread.sleep(runSchedule);
} catch (final Exception t) {
logger.error("Exception in runRecursive "+this.processor.getIdentifier(),t);
return false;
}
} while(!stopRequested && !inputQueue.isEmpty() && processingSuccess.get());
this.isStopped = true;
return processingSuccess.get();
}
private boolean runChildren(Queue<FnFlowFile> output) {
Queue<FnFlowFile> penalizedFlowFiles = this.getPenalizedFlowFiles();
if(penalizedFlowFiles.size() > 0){
output.addAll(penalizedFlowFiles);
return false;
}
for(Relationship r : this.getProcessor().getRelationships()) {
if(this.autoTermination.contains(r))
continue;
Queue<FnFlowFile> files = this.getAndRemoveFlowFilesForRelationship(r);
if(files.size() == 0)
continue;
if(this.failureOutputPorts.contains(r)) {
output.addAll(files);
return false;
}
if(this.successOutputPorts.contains(r))
output.addAll(files);
if(children.containsKey(r)) {
for (FnProcessorWrapper child : children.get(r)) {
child.enqueueAll(files);
boolean successfulRun = child.runRecursive(output);
if (!successfulRun)
return false;
}
}
}
return true;
}
public void shutdown(){
this.stopRequested = true;
for(Relationship r : this.getProcessor().getRelationships()) {
if(this.autoTermination.contains(r))
continue;
if(!children.containsKey(r))
throw new IllegalArgumentException("No child for relationship: "+r.getName());
children.get(r).forEach(FnProcessorWrapper::shutdown);
}
while(!this.isStopped){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor, context);
ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("Failure on shutdown: ", e);
}
logger.info(this.processor.getClass().getSimpleName()+" shutdown");
}
public boolean validate(){
if(!context.isValid())
return false;
for(Relationship r : this.getProcessor().getRelationships()) {
boolean hasChildren = this.children.containsKey(r);
boolean hasAutoterminate = this.autoTermination.contains(r);
boolean hasFailureOutputPort = this.failureOutputPorts.contains(r);
boolean hasSuccessOutputPort = this.successOutputPorts.contains(r);
if (!(hasChildren || hasAutoterminate || hasFailureOutputPort || hasSuccessOutputPort)) {
logger.error("Processor: " + this.toString() + ", Relationship: " + r.getName() + ", needs either auto terminate, child processors, or an output port");
return false;
}
}
for( Map.Entry<Relationship, ArrayList<FnProcessorWrapper>> child : this.children.entrySet()){
for(FnProcessorWrapper n : child.getValue()){
if(!n.validate())
return false;
}
}
return true;
}
public void enqueueAll(Queue<FnFlowFile> list){
inputQueue.addAll(list);
}
public Queue<FnFlowFile> getAndRemoveFlowFilesForRelationship(final Relationship relationship) {
List<FnFlowFile> sortedList = createdSessions.stream()
.flatMap(s-> s.getAndRemoveFlowFilesForRelationship(relationship).stream())
.sorted(Comparator.comparing(f -> f.getCreationTime()))
.collect(Collectors.toList());
return new LinkedList<>(sortedList);
}
public Queue<FnFlowFile> getPenalizedFlowFiles(){
List<FnFlowFile> sortedList = createdSessions.stream()
.flatMap(s-> s.getPenalizedFlowFiles().stream())
.sorted(Comparator.comparing(f -> f.getCreationTime()))
.collect(Collectors.toList());
return new LinkedList<>(sortedList);
}
public ValidationResult setProperty(final PropertyDescriptor property, final String propertyValue) {
return context.setProperty(property,propertyValue);
}
public ValidationResult setProperty(final String propertyName, final String propertyValue) {
return context.setProperty(propertyName, propertyValue);
}
public void addOutputPort(Relationship relationship, boolean isFailurePort){
if(isFailurePort)
this.failureOutputPorts.add(relationship);
else
this.successOutputPorts.add(relationship);
}
public FnProcessorWrapper addChild(Processor p, Relationship relationship) throws InvocationTargetException, IllegalAccessException {
ArrayList<FnProcessorWrapper> list = children.computeIfAbsent(relationship, r -> new ArrayList<>());
FnProcessorWrapper child = new FnProcessorWrapper(p,this, lookup, variableRegistry, materializeContent);
list.add(child);
context.addConnection(relationship);
return child;
}
public FnProcessorWrapper addChild(FnProcessorWrapper child, Relationship relationship) {
ArrayList<FnProcessorWrapper> list = children.computeIfAbsent(relationship, r -> new ArrayList<>());
list.add(child);
context.addConnection(relationship);
return child;
}
public void addAutoTermination(Relationship relationship){
this.autoTermination.add(relationship);
context.addConnection(relationship);
}
}

View File

@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.expression.ExpressionLanguageScope;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FnPropertyValue implements PropertyValue {
private final String rawValue;
private final Boolean expectExpressions;
private final ExpressionLanguageScope expressionLanguageScope;
private final FnControllerServiceLookup serviceLookup;
private final PropertyDescriptor propertyDescriptor;
private final PropertyValue stdPropValue;
private final VariableRegistry variableRegistry;
private boolean expressionsEvaluated = false;
public FnPropertyValue(final String rawValue) {
this(rawValue, null);
}
public FnPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) {
this(rawValue, serviceLookup, VariableRegistry.EMPTY_REGISTRY, null);
}
public FnPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final VariableRegistry variableRegistry) {
this(rawValue, serviceLookup, variableRegistry, null);
}
public FnPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, VariableRegistry variableRegistry, final PropertyDescriptor propertyDescriptor) {
this(rawValue, serviceLookup, propertyDescriptor, false, variableRegistry);
}
private FnPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor, final boolean alreadyEvaluated,
final VariableRegistry variableRegistry) {
this.stdPropValue = new StandardPropertyValue(rawValue, serviceLookup, variableRegistry);
this.rawValue = rawValue;
this.serviceLookup = (FnControllerServiceLookup) serviceLookup;
this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported();
this.expressionLanguageScope = propertyDescriptor == null ? null : propertyDescriptor.getExpressionLanguageScope();
this.propertyDescriptor = propertyDescriptor;
this.expressionsEvaluated = alreadyEvaluated;
this.variableRegistry = variableRegistry;
}
private void ensureExpressionsEvaluated() {
if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) {
throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor
+ " without first evaluating Expressions, even though the PropertyDescriptor indicates "
+ "that the Expression Language is Supported. If you realize that this is the case and do not want "
+ "this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false)");
}
}
private void validateExpressionScope(boolean attributesAvailable) {
// language scope is not null, we have attributes available but scope is not equal to FF attributes
// it means that we're not evaluating against flow file attributes even though attributes are available
if(expressionLanguageScope != null
&& (attributesAvailable && !ExpressionLanguageScope.FLOWFILE_ATTRIBUTES.equals(expressionLanguageScope))) {
throw new IllegalStateException("Attempting to evaluate expression language for " + propertyDescriptor.getName()
+ " using flow file attributes but the scope evaluation is set to " + expressionLanguageScope + ". The"
+ " proper scope should be set in the property descriptor using"
+ " PropertyDescriptor.Builder.expressionLanguageSupported(ExpressionLanguageScope)");
}
// if the service lookup is an instance of the validation context, we're in the validate() method
// at this point we don't have any flow file available and we should not care about the scope
// even though it is defined as FLOWFILE_ATTRIBUTES
if(expressionLanguageScope != null
&& ExpressionLanguageScope.FLOWFILE_ATTRIBUTES.equals(expressionLanguageScope)
&& this.serviceLookup instanceof FnControllerServiceLookup) {
return;
}
// we check if the input requirement is INPUT_FORBIDDEN
// in that case, we don't care if attributes are not available even though scope is FLOWFILE_ATTRIBUTES
// it likely means that the property has been defined in a common/abstract class used by multiple processors with
// different input requirements.
//if(expressionLanguageScope != null
// && ExpressionLanguageScope.FLOWFILE_ATTRIBUTES.equals(expressionLanguageScope)
// && (this.serviceLookup.getInputRequirement() == null
// || this.serviceLookup.getInputRequirement().value().equals(InputRequirement.Requirement.INPUT_FORBIDDEN))) {
// return;
//}
// if we have a processor where input requirement is INPUT_ALLOWED, we need to check if there is an
// incoming connection or not. If not, we don't care if attributes are not available even though scope is FLOWFILE_ATTRIBUTES
//if(expressionLanguageScope != null
// && ExpressionLanguageScope.FLOWFILE_ATTRIBUTES.equals(expressionLanguageScope)
// && !((org.apache.nifi.fn.core.FnProcessContext) this.context).hasIncomingConnection()) {
// return;
//}
// we're trying to evaluate against flow files attributes but we don't have any attributes available.
if(expressionLanguageScope != null
&& (!attributesAvailable && ExpressionLanguageScope.FLOWFILE_ATTRIBUTES.equals(expressionLanguageScope))) {
throw new IllegalStateException("Attempting to evaluate expression language for " + propertyDescriptor.getName()
+ " without using flow file attributes but the scope evaluation is set to " + expressionLanguageScope + ". The"
+ " proper scope should be set in the property descriptor using"
+ " PropertyDescriptor.Builder.expressionLanguageSupported(ExpressionLanguageScope)");
}
}
@Override
public String getValue() {
ensureExpressionsEvaluated();
return stdPropValue.getValue();
}
@Override
public Integer asInteger() {
ensureExpressionsEvaluated();
return stdPropValue.asInteger();
}
@Override
public Long asLong() {
ensureExpressionsEvaluated();
return stdPropValue.asLong();
}
@Override
public Boolean asBoolean() {
ensureExpressionsEvaluated();
return stdPropValue.asBoolean();
}
@Override
public Float asFloat() {
ensureExpressionsEvaluated();
return stdPropValue.asFloat();
}
@Override
public Double asDouble() {
ensureExpressionsEvaluated();
return stdPropValue.asDouble();
}
@Override
public Long asTimePeriod(final TimeUnit timeUnit) {
ensureExpressionsEvaluated();
return stdPropValue.asTimePeriod(timeUnit);
}
@Override
public Double asDataSize(final DataUnit dataUnit) {
ensureExpressionsEvaluated();
return stdPropValue.asDataSize(dataUnit);
}
private void markEvaluated() {
if (Boolean.FALSE.equals(expectExpressions)) {
throw new IllegalStateException("Attempting to Evaluate Expressions but " + propertyDescriptor
+ " indicates that the Expression Language is not supported. If you realize that this is the case and do not want "
+ "this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false)");
}
expressionsEvaluated = true;
}
@Override
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
return evaluateAttributeExpressions(null, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
/*
* The reason for this null check is that somewhere in the test API, it automatically assumes that a null FlowFile
* should be treated as though it were evaluated with the VARIABLE_REGISTRY scope instead of the flowfile scope. When NiFi
* is running, it doesn't care when it's evaluating EL against a null flowfile. However, the testing framework currently
* raises an error which makes it not mimick real world behavior.
*/
if (flowFile == null && expressionLanguageScope == ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) {
return evaluateAttributeExpressions(new HashMap<>());
} else if (flowFile == null && expressionLanguageScope == ExpressionLanguageScope.VARIABLE_REGISTRY) {
return evaluateAttributeExpressions(); //Added this to get around a similar edge case where the a null flowfile is passed
//and the scope is to the registry
}
return evaluateAttributeExpressions(flowFile, null, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, null, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(null, attributes, decorator);
}
@Override
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map<String, String> additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException {
return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null);
}
@Override
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException {
markEvaluated();
if (rawValue == null) {
return this;
}
validateExpressionScope(flowFile != null || additionalAttributes != null);
final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, stateValues);
return new FnPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true, variableRegistry);
}
@Override
public ControllerService asControllerService() {
ensureExpressionsEvaluated();
if (rawValue == null || rawValue.equals("")) {
return null;
}
return serviceLookup.getControllerService(rawValue);
}
@Override
public <T extends ControllerService> T asControllerService(final Class<T> serviceType) throws IllegalArgumentException {
ensureExpressionsEvaluated();
if (rawValue == null || rawValue.equals("")) {
return null;
}
final ControllerService service = serviceLookup.getControllerService(rawValue);
if (serviceType.isAssignableFrom(service.getClass())) {
return serviceType.cast(service);
}
throw new IllegalArgumentException("Controller Service with identifier " + rawValue + " is of type " + service.getClass() + " and cannot be cast to " + serviceType);
}
@Override
public boolean isSet() {
return rawValue != null;
}
@Override
public String toString() {
return getValue();
}
@Override
public boolean isExpressionLanguagePresent() {
if (!Boolean.TRUE.equals(expectExpressions)) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(rawValue);
return (elRanges != null && !elRanges.isEmpty());
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class FnStateManager implements StateManager {
private final AtomicInteger versionIndex = new AtomicInteger(0);
private Map<Scope, FnStateMap> maps; //Local, Cluster
public FnStateManager(){
this.maps = new HashMap<>();
for(Scope s : Scope.values()){
this.maps.put(s, new FnStateMap(null, -1L));
}
}
public synchronized void setState(final Map<String, String> state, final Scope scope) {
maps.put(scope, new FnStateMap(state, versionIndex.incrementAndGet()));
}
public synchronized StateMap getState(final Scope scope) {
return maps.get(scope);
}
public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
if(oldValue == maps.get(scope)){
maps.put(scope,new FnStateMap(newValue, versionIndex.incrementAndGet()));
return true;
} else {
return false;
}
}
public synchronized void clear(final Scope scope) {
setState(Collections.<String, String> emptyMap(), scope);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.components.state.StateMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class FnStateMap implements StateMap {
private final Map<String, String> stateValues;
private final long version;
public FnStateMap(final Map<String, String> stateValues, final long version) {
this.stateValues = stateValues == null ? Collections.<String, String>emptyMap() : new HashMap<>(stateValues);
this.version = version;
}
public long getVersion() {
return version;
}
public String get(final String key) {
return stateValues.get(key);
}
public Map<String, String> toMap() {
return Collections.unmodifiableMap(stateValues);
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
import org.apache.nifi.registry.VariableRegistry;
import java.util.*;
public class FnValidationContext implements ValidationContext {
private final FnControllerServiceLookup lookup;
private final Map<String, Boolean> expressionLanguageSupported;
private final StateManager stateManager;
private final VariableRegistry variableRegistry;
private final FnProcessContext processContext;
public FnValidationContext(final FnProcessContext processContext, final FnControllerServiceLookup lookup, final StateManager stateManager, final VariableRegistry variableRegistry) {
this.processContext = processContext;
this.lookup = lookup;
this.stateManager = stateManager;
this.variableRegistry = variableRegistry;
final Map<PropertyDescriptor, String> properties = processContext.getProperties();
expressionLanguageSupported = new HashMap<>(properties.size());
for (final PropertyDescriptor descriptor : properties.keySet()) {
expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported());
}
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
return new FnPropertyValue(rawValue, this.lookup, variableRegistry);
}
@Override
public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
return new StandardExpressionLanguageCompiler(variableRegistry);
}
@Override
public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) {
final FnProcessContext serviceProcessContext = new FnProcessContext(controllerService, lookup, null ,stateManager, variableRegistry);
return new FnValidationContext(serviceProcessContext, lookup, stateManager, variableRegistry);
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
return processContext.getProperty(property);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return processContext.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
return processContext.getAnnotationData();
}
@Override
public boolean isExpressionLanguagePresent(final String value) {
if (value == null) {
return false;
}
final List<Query.Range> elRanges = Query.extractExpressionRanges(value);
return (elRanges != null && !elRanges.isEmpty());
}
@Override
public boolean isExpressionLanguageSupported(final String propertyName) {
final Boolean supported = expressionLanguageSupported.get(propertyName);
return Boolean.TRUE.equals(supported);
}
@Override
public String getProcessGroupIdentifier() {
return "fn";
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return this.lookup;
}
@Override
public boolean isValidationRequired(final ControllerService service) {
return true;
}
}

View File

@ -0,0 +1,506 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.provenance.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class ProvenanceCollector implements ProvenanceReporter {
private static final Logger logger = LoggerFactory.getLogger(ProvenanceCollector.class);
private final FnProcessSession session;
private final String processorId;
private final String processorType;
private final Collection<ProvenanceEventRecord> events;
public ProvenanceCollector(final FnProcessSession session, final Collection<ProvenanceEventRecord> events, final String processorId, final String processorType) {
this.session = session;
this.events = events;
this.processorId = processorId;
this.processorType = processorType;
}
private void verifyFlowFileKnown(final FlowFile flowFile) {
if (session != null && !session.isFlowFileKnown(flowFile)) {
throw new FlowFileHandlingException(flowFile + " is not known to " + session);
}
}
/**
* Removes the given event from the reporter
*
* @param event
* event
*/
void remove(final ProvenanceEventRecord event) {
events.remove(event);
}
void clear() {
events.clear();
}
void migrate(final ProvenanceCollector newOwner, final Set<String> flowFileIds) {
final Set<ProvenanceEventRecord> toMove = new LinkedHashSet<>();
for (final ProvenanceEventRecord event : events) {
if (flowFileIds.contains(event.getFlowFileUuid())) {
toMove.add(event);
}
}
events.removeAll(toMove);
newOwner.events.addAll(toMove);
}
/**
* Generates a Fork event for the given child and parents but does not
* register the event. This is useful so that a ProcessSession has the
* ability to de-dupe events, since one or more events may be created by the
* session itself, as well as by the Processor
*
* @param parents
* parents
* @param child
* child
* @return record
*/
ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> parents, final FlowFile child) {
final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN);
eventBuilder.addChildFlowFile(child);
for (final FlowFile parent : parents) {
eventBuilder.addParentFlowFile(parent);
}
return eventBuilder.build();
}
ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final String details) {
return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build();
}
@Override
public void receive(final FlowFile flowFile, final String transitUri) {
receive(flowFile, transitUri, -1L);
}
@Override
public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L);
}
@Override
public void receive(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
receive(flowFile, transitUri, null, transmissionMillis);
}
@Override
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis);
}
@Override
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
.setTransitUri(transitUri)
.setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier)
.setEventDuration(transmissionMillis)
.setDetails(details)
.build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void fetch(final FlowFile flowFile, final String transitUri) {
fetch(flowFile, transitUri, -1L);
}
@Override
public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
fetch(flowFile, transitUri, null, transmissionMillis);
}
@Override
public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
.setTransitUri(transitUri)
.setEventDuration(transmissionMillis)
.setDetails(details)
.build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
send(flowFile, transitUri, transmissionMillis, true);
}
@Override
public void send(final FlowFile flowFile, final String transitUri) {
send(flowFile, transitUri, null, -1L, true);
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final String details) {
send(flowFile, transitUri, details, -1L, true);
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis, final boolean force) {
send(flowFile, transitUri, null, transmissionMillis, force);
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final String details, final boolean force) {
send(flowFile, transitUri, details, -1L, force);
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
send(flowFile, transitUri, details, transmissionMillis, true);
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
if (force) {
//sharedSessionState.addProvenanceEvents(Collections.singleton(record));
events.add(record);
} else {
events.add(record);
}
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void send(final FlowFile flowFile, final String transitUri, final boolean force) {
send(flowFile, transitUri, -1L, true);
}
@Override
public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri) {
invokeRemoteProcess(flowFile, transitUri, null);
}
@Override
public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION)
.setTransitUri(transitUri).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) {
try {
String trimmedNamespace = alternateIdentifierNamespace.trim();
if (trimmedNamespace.endsWith(":")) {
trimmedNamespace = trimmedNamespace.substring(0, trimmedNamespace.length() - 1);
}
String trimmedIdentifier = alternateIdentifier.trim();
if (trimmedIdentifier.startsWith(":")) {
if (trimmedIdentifier.length() == 1) {
throw new IllegalArgumentException("Illegal alternateIdentifier: " + alternateIdentifier);
}
trimmedIdentifier = trimmedIdentifier.substring(1);
}
final String alternateIdentifierUri = trimmedNamespace + ":" + trimmedIdentifier;
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) {
try {
final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.DROP);
if (reason != null) {
builder.setDetails("Discard reason: " + reason);
}
final ProvenanceEventRecord record = builder.build();
events.add(record);
return record;
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
return null;
}
}
void expire(final FlowFile flowFile, final String details) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void fork(final FlowFile parent, final Collection<FlowFile> children) {
fork(parent, children, null, -1L);
}
@Override
public void fork(final FlowFile parent, final Collection<FlowFile> children, final long forkDuration) {
fork(parent, children, null, forkDuration);
}
@Override
public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details) {
fork(parent, children, details, -1L);
}
@Override
public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details, final long forkDuration) {
verifyFlowFileKnown(parent);
try {
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.FORK);
eventBuilder.addParentFlowFile(parent);
for (final FlowFile child : children) {
eventBuilder.addChildFlowFile(child);
}
if (forkDuration > -1L) {
eventBuilder.setEventDuration(forkDuration);
}
if (details != null) {
eventBuilder.setDetails(details);
}
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void join(final Collection<FlowFile> parents, final FlowFile child) {
join(parents, child, null, -1L);
}
@Override
public void join(final Collection<FlowFile> parents, final FlowFile child, final long joinDuration) {
join(parents, child, null, joinDuration);
}
@Override
public void join(final Collection<FlowFile> parents, final FlowFile child, final String details) {
join(parents, child, details, -1L);
}
@Override
public void join(final Collection<FlowFile> parents, final FlowFile child, final String details, final long joinDuration) {
verifyFlowFileKnown(child);
try {
final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN);
eventBuilder.addChildFlowFile(child);
eventBuilder.setDetails(details);
for (final FlowFile parent : parents) {
eventBuilder.addParentFlowFile(parent);
}
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void clone(final FlowFile parent, final FlowFile child) {
verifyFlowFileKnown(child);
try {
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);
eventBuilder.addChildFlowFile(child);
eventBuilder.addParentFlowFile(parent);
events.add(eventBuilder.build());
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void modifyContent(final FlowFile flowFile) {
modifyContent(flowFile, null, -1L);
}
@Override
public void modifyContent(final FlowFile flowFile, final String details) {
modifyContent(flowFile, details, -1L);
}
@Override
public void modifyContent(final FlowFile flowFile, final long processingMillis) {
modifyContent(flowFile, null, processingMillis);
}
@Override
public void modifyContent(final FlowFile flowFile, final String details, final long processingMillis) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void modifyAttributes(final FlowFile flowFile) {
modifyAttributes(flowFile, null);
}
@Override
public void modifyAttributes(final FlowFile flowFile, final String details) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void route(final FlowFile flowFile, final Relationship relationship) {
route(flowFile, relationship, null);
}
@Override
public void route(final FlowFile flowFile, final Relationship relationship, final long processingDuration) {
route(flowFile, relationship, null, processingDuration);
}
@Override
public void route(final FlowFile flowFile, final Relationship relationship, final String details) {
route(flowFile, relationship, details, -1L);
}
@Override
public void route(final FlowFile flowFile, final Relationship relationship, final String details, final long processingDuration) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override
public void create(final FlowFile flowFile) {
create(flowFile, null);
}
@Override
public void create(final FlowFile flowFile, final String details) {
verifyFlowFileKnown(flowFile);
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventType(eventType);
builder.fromFlowFile(flowFile);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(processorId);
builder.setComponentType(processorType);
return builder;
}
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReflectionUtils {
private final static Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
/**
* Invokes all methods on the given instance that have been annotated with
* the given Annotation. If the signature of the method that is defined in
* <code>instance</code> uses 1 or more parameters, those parameters must be
* specified by the <code>args</code> parameter. However, if more arguments
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
* @param annotation the annotation to look for
* @param instance to invoke a method of
* @param args to supply in a method call
* @throws InvocationTargetException ite
* @throws IllegalArgumentException iae
* @throws IllegalAccessException if not allowed to invoke that method
*/
public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotation)) {
final boolean isAccessible = method.isAccessible();
method.setAccessible(true);
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given",
method.getName(), instance, argumentTypes.length, args.length));
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
throw new IllegalArgumentException(String.format(
"Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s",
method.getName(), instance, i, argType, args[i].getClass()));
}
}
if (argumentTypes.length == args.length) {
method.invoke(instance, args);
} else {
final Object[] argsToPass = new Object[argumentTypes.length];
for (int i = 0; i < argsToPass.length; i++) {
argsToPass[i] = args[i];
}
method.invoke(instance, argsToPass);
}
} finally {
if (!isAccessible) {
method.setAccessible(false);
}
}
}
}
}
/**
* Invokes all methods on the given instance that have been annotated with
* the given Annotation. If the signature of the method that is defined in
* <code>instance</code> uses 1 or more parameters, those parameters must be
* specified by the <code>args</code> parameter. However, if more arguments
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
* @param annotation the annotation to look for
* @param instance to invoke a method of
* @param args to supply in a method call
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) {
for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotation)) {
final boolean isAccessible = method.isAccessible();
method.setAccessible(true);
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
return false;
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
return false;
}
}
try {
if (argumentTypes.length == args.length) {
method.invoke(instance, args);
} else {
final Object[] argsToPass = new Object[argumentTypes.length];
for (int i = 0; i < argsToPass.length; i++) {
argsToPass[i] = args[i];
}
method.invoke(instance, argsToPass);
}
} catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
LOG.error("", t);
return false;
}
} finally {
if (!isAccessible) {
method.setAccessible(false);
}
}
}
}
return true;
}
public static ControllerService createControllerService(VersionedControllerService versionedControllerService) {
//org.apache.nifi.registry.flow.Bundle bundle = versionedControllerService.getBundle();
//BundleCoordinate coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), "1.7.1");
//final Bundle processorBundle = ExtensionManager.getBundle(coordinate);
//if (processorBundle == null) {
// throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundle.toString());
//}
final Bundle systemBundle = SystemBundle.create(new NiFiProperties() {
@Override
public String getProperty(String s) {
if(s.equals("nifi.nar.library.directory"))
return "/usr/share/nifi-1.8.0/lib/";
return null;
}
@Override
public Set<String> getPropertyKeys() {
return null;
}
});
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
//final ClassLoader detectedClassLoaderForInstance = ExtensionManager.createInstanceClassLoader(versionedControllerService.getType(), UUID.randomUUID().toString(), systemBundle, null);
final Class<?> rawClass = Class.forName(versionedControllerService.getType(), true, ctxClassLoader);
//Thread.currentThread().setContextClassLoader(detectedClassLoaderForInstance);
final Class<? extends ControllerService> processorClass = rawClass.asSubclass(ControllerService.class);
return processorClass.newInstance();
} catch (final Throwable t) {
throw new ControllerServiceInstantiationException(versionedControllerService.getType(), t);
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
}
}
}
public static Processor createProcessor(VersionedProcessor versionedProcessor) throws ProcessorInstantiationException {
//org.apache.nifi.registry.flow.Bundle bundle = versionedProcessor.getBundle();
//BundleCoordinate coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), "1.8.0");
//final Bundle processorBundle = ExtensionManager.getBundle(coordinate);
//if (processorBundle == null) {
// throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundle.toString());
//}
final Bundle systemBundle = SystemBundle.create(new NiFiProperties() {
@Override
public String getProperty(String s) {
if(s.equals("nifi.nar.library.directory"))
return "/usr/share/nifi-1.8.0/lib/";
return null;
}
@Override
public Set<String> getPropertyKeys() {
return null;
}
});
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
//final ClassLoader detectedClassLoaderForInstance = ExtensionManager.createInstanceClassLoader(versionedProcessor.getType(), UUID.randomUUID().toString(), systemBundle, null);
final Class<?> rawClass = Class.forName(versionedProcessor.getType(), true, ctxClassLoader);
//Thread.currentThread().setContextClassLoader(detectedClassLoaderForInstance);
final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
return processorClass.newInstance();
} catch (final Throwable t) {
throw new ProcessorInstantiationException(versionedProcessor.getType(), t);
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.*;
import org.apache.nifi.util.NiFiProperties;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class RegistryUtil {
private RestBasedFlowRegistry registry;
public RegistryUtil(String registryUrl){
try {
registry = new RestBasedFlowRegistry(new StandardFlowRegistryClient(),"id",registryUrl, SSLContext.getDefault(),"name");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
public VersionedFlowSnapshot getFlowByName(String bucketName, String flowName) throws IOException, NiFiRegistryException {
return getFlowByName(bucketName, flowName,-1);
}
public VersionedFlowSnapshot getFlowByName(String bucketName, String flowName, int versionID) throws IOException, NiFiRegistryException {
//Get bucket by name
Set<Bucket> buckets = this.getBuckets();
Optional<Bucket> bucketOptional = buckets.stream().filter(b->b.getName().equals(bucketName)).findFirst();
if(!bucketOptional.isPresent())
throw new IllegalArgumentException("Bucket not found");
String bucketID = bucketOptional.get().getIdentifier();
//Get flow by name
Set<VersionedFlow> flows = this.getFlows(bucketID);
Optional<VersionedFlow> flowOptional = flows.stream().filter(b->b.getName().equals(flowName)).findFirst();
if(!flowOptional.isPresent())
throw new IllegalArgumentException("Flow not found");
String flowID = flowOptional.get().getIdentifier();
return getFlowByID(bucketID,flowID, versionID);
}
public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID) throws IOException, NiFiRegistryException {
return getFlowByID(bucketID, flowID,-1);
}
public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID, int versionID) throws IOException, NiFiRegistryException {
if(versionID == -1)
versionID = this.getLatestVersion(bucketID, flowID);
return registry.getFlowContents(bucketID, flowID, versionID,true);
}
public Map<String,String> getVariables(String bucketID, String flowID) throws IOException, NiFiRegistryException {
VersionedFlowSnapshot flow = this.getFlowByID(bucketID,flowID);
return flow.getFlowContents().getVariables();
}
public Map<String,String> getVariables(String bucketID, String flowID, int versionID) throws IOException, NiFiRegistryException {
VersionedFlowSnapshot flow = this.getFlowByID(bucketID,flowID,versionID);
return flow.getFlowContents().getVariables();
}
public Set<Bucket> getBuckets() throws IOException, NiFiRegistryException {
return registry.getBuckets(NiFiUserUtils.getNiFiUser());
}
public Set<VersionedFlow> getFlows(String bucketID) throws IOException, NiFiRegistryException {
return registry.getFlows(bucketID,NiFiUserUtils.getNiFiUser());
}
public int getLatestVersion(String bucketID, String flowID) throws IOException, NiFiRegistryException {
return registry.getLatestVersion(bucketID, flowID, NiFiUserUtils.getNiFiUser());
}
}

View File

@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SLF4JComponentLog implements ComponentLog {
private final Logger logger;
private final Object component;
public SLF4JComponentLog(final Object component) {
this.logger = LoggerFactory.getLogger(component.getClass());
this.component = component;
}
private Object[] addProcessor(final Object[] originalArgs) {
return prependToArgs(originalArgs, component);
}
private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) {
final Object[] modifiedArgs = new Object[os.length + 2];
modifiedArgs[0] = component.toString();
for (int i = 0; i < os.length; i++) {
modifiedArgs[i + 1] = os[i];
}
modifiedArgs[modifiedArgs.length - 1] = t.toString();
return modifiedArgs;
}
private Object[] prependToArgs(final Object[] originalArgs, final Object... toAdd) {
final Object[] newArgs = new Object[originalArgs.length + toAdd.length];
System.arraycopy(toAdd, 0, newArgs, 0, toAdd.length);
System.arraycopy(originalArgs, 0, newArgs, toAdd.length, originalArgs.length);
return newArgs;
}
private Object[] translateException(final Object[] os) {
if (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)) {
final Object[] osCopy = new Object[os.length];
osCopy[osCopy.length - 1] = os[os.length - 1].toString();
System.arraycopy(os, 0, osCopy, 0, os.length - 1);
return osCopy;
}
return os;
}
private boolean lastArgIsException(final Object[] os) {
return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable));
}
@Override
public void warn(final String msg, final Throwable t) {
warn("{} " + msg, new Object[]{component}, t);
}
@Override
public void warn(String msg, Object[] os) {
if (lastArgIsException(os)) {
warn(msg, translateException(os), (Throwable) os[os.length - 1]);
} else {
msg = "{} " + msg;
os = addProcessor(os);
logger.warn(msg, os);
}
}
@Override
public void warn(String msg, Object[] os, final Throwable t) {
os = addProcessorAndThrowable(os, t);
msg = "{} " + msg + ": {}";
logger.warn(msg, os);
if (logger.isDebugEnabled()) {
logger.warn("", t);
}
}
@Override
public void warn(String msg) {
msg = "{} " + msg;
logger.warn(msg, component);
}
@Override
public void trace(String msg, Throwable t) {
msg = "{} " + msg;
final Object[] os = {component};
logger.trace(msg, os, t);
}
@Override
public void trace(String msg, Object[] os) {
msg = "{} " + msg;
os = addProcessor(os);
logger.trace(msg, os);
}
@Override
public void trace(String msg) {
msg = "{} " + msg;
final Object[] os = {component};
logger.trace(msg, os);
}
@Override
public void trace(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
msg = "{} " + msg + ": {}";
logger.trace(msg, os);
logger.trace("", t);
}
@Override
public boolean isWarnEnabled() {
return logger.isWarnEnabled();
}
@Override
public boolean isTraceEnabled() {
return logger.isTraceEnabled();
}
@Override
public boolean isInfoEnabled() {
return logger.isInfoEnabled();
}
@Override
public boolean isErrorEnabled() {
return logger.isErrorEnabled();
}
@Override
public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
@Override
public void info(String msg, Throwable t) {
msg = "{} " + msg;
final Object[] os = {component};
logger.info(msg, os);
if (logger.isDebugEnabled()) {
logger.info("", t);
}
}
@Override
public void info(String msg, Object[] os) {
msg = "{} " + msg;
os = addProcessor(os);
logger.info(msg, os);
}
@Override
public void info(String msg) {
msg = "{} " + msg;
final Object[] os = {component};
logger.info(msg, os);
}
@Override
public void info(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
msg = "{} " + msg + ": {}";
logger.info(msg, os);
if (logger.isDebugEnabled()) {
logger.info("", t);
}
}
@Override
public String getName() {
return logger.getName();
}
@Override
public void error(String msg, Throwable t) {
msg = "{} " + msg;
final Object[] os = {component};
logger.error(msg, os, t);
if (logger.isDebugEnabled()) {
logger.error("", t);
}
}
@Override
public void error(String msg, Object[] os) {
if (lastArgIsException(os)) {
error(msg, translateException(os), (Throwable) os[os.length - 1]);
} else {
os = addProcessor(os);
msg = "{} " + msg;
logger.error(msg, os);
}
}
@Override
public void error(String msg) {
msg = "{} " + msg;
final Object[] os = {component};
logger.error(msg, os);
}
@Override
public void error(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
msg = "{} " + msg + ": {}";
logger.error(msg, os);
if (logger.isDebugEnabled()) {
logger.error("", t);
}
}
@Override
public void debug(String msg, Throwable t) {
msg = "{} " + msg;
final Object[] os = {component};
logger.debug(msg, os, t);
}
@Override
public void debug(String msg, Object[] os) {
os = addProcessor(os);
msg = "{} " + msg;
logger.debug(msg, os);
}
@Override
public void debug(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
msg = "{} " + msg + ": {}";
logger.debug(msg, os);
if (logger.isDebugEnabled()) {
logger.debug("", t);
}
}
@Override
public void debug(String msg) {
msg = "{} " + msg;
final Object[] os = {component};
logger.debug(msg, os);
}
@Override
public void log(LogLevel level, String msg, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, t);
break;
case ERROR:
case FATAL:
error(msg, t);
break;
case INFO:
info(msg, t);
break;
case TRACE:
trace(msg, t);
break;
case WARN:
warn(msg, t);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os) {
switch (level) {
case DEBUG:
debug(msg, os);
break;
case ERROR:
case FATAL:
error(msg, os);
break;
case INFO:
info(msg, os);
break;
case TRACE:
trace(msg, os);
break;
case WARN:
warn(msg, os);
break;
}
}
@Override
public void log(LogLevel level, String msg) {
switch (level) {
case DEBUG:
debug(msg);
break;
case ERROR:
case FATAL:
error(msg);
break;
case INFO:
info(msg);
break;
case TRACE:
trace(msg);
break;
case WARN:
warn(msg);
break;
}
}
@Override
public void log(LogLevel level, String msg, Object[] os, Throwable t) {
switch (level) {
case DEBUG:
debug(msg, os, t);
break;
case ERROR:
case FATAL:
error(msg, os, t);
break;
case INFO:
info(msg, os, t);
break;
case TRACE:
trace(msg, os, t);
break;
case WARN:
warn(msg, os, t);
break;
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.runtimes.OpenWhisk;
import com.google.gson.JsonObject;
import org.apache.nifi.fn.core.FnFlow;
import org.apache.nifi.fn.core.FnFlowFile;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedList;
import java.util.Queue;
public class JavaAction {
//wsk action create nififn NiFiFn/target/NiFi-Fn-1.0-SNAPSHOT.jar --main org.apache.nifi.fn.runtimes.OpenWhisk.JavaAction#OpenWhiskJavaEntry
//wsk action invoke -br nififn -p registryurl http://172.0.0.1:61080 -p bucket e53b8a0d-5c85-4fcd-912a-1c549a586c83 -p flow 6cf8277a-c402-4957-8623-0fa9890dd45d -p variable1 val1 -p variable2 val2
public static JsonObject OpenWhiskJavaEntry(JsonObject args) {
JsonObject result = new JsonObject();
try {
FnFlow flow = FnFlow.createAndEnqueueFromJSON(args);
Queue<FnFlowFile> output = new LinkedList<>();
boolean successful = flow.runOnce(output);
StringBuilder response = new StringBuilder();
for(FnFlowFile file : output)
response.append("\n").append(file);
result.addProperty("success", successful);
result.addProperty("message", response.toString());
} catch (Exception ex){
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
result.addProperty("success",false);
result.addProperty("message", "Flow exception: "+ex.getMessage()+"--"+sw.toString());
}
return result;
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.runtimes.OpenWhisk;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.nifi.fn.core.FnFlow;
import org.apache.nifi.fn.core.FnFlowFile;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Queue;
public class NiFiFnOpenWhiskAction {
private HttpServer server;
private boolean initialized = false;
private FnFlow flow = null;
public NiFiFnOpenWhiskAction(int port) throws IOException {
this.server = HttpServer.create(new InetSocketAddress(port), -1);
this.server.createContext("/init", new InitHandler());
this.server.createContext("/run", new RunHandler());
this.server.setExecutor(null); // creates a default executor
}
public void start() {
server.start();
}
private static void writeResponse(HttpExchange t, int code, String content) throws IOException {
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
t.sendResponseHeaders(code, bytes.length);
OutputStream os = t.getResponseBody();
os.write(bytes);
os.close();
}
private static void writeError(HttpExchange t, String errorMessage) throws IOException {
JsonObject message = new JsonObject();
message.addProperty("error", errorMessage);
writeResponse(t, 502, message.toString());
}
private static void writeLogMarkers() {
System.out.println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX");
System.err.println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX");
System.out.flush();
System.err.flush();
}
private class InitHandler implements HttpHandler {
public void handle(HttpExchange t) throws IOException {
initialized = true;
writeResponse(t, 200,"Initialized");
InputStream is = t.getRequestBody();
JsonParser parser = new JsonParser();
JsonObject body = parser.parse(new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))).getAsJsonObject();
System.out.println("Init input: " + body);
String code = body.get("value").getAsJsonObject().get("code").getAsString();
if(code.equals("GENERIC")){
initialized = true;
writeResponse(t, 200,"Initialized Generic Action");
} else {
JsonObject flowDefinition = parser.parse(code).getAsJsonObject();
try {
flow = FnFlow.createAndEnqueueFromJSON(flowDefinition);
initialized = true;
writeResponse(t, 200, "Initialized "+flow);
} catch (Exception e) {
e.printStackTrace(System.err);
writeResponse(t, 400, "Error: " + e.getMessage());
}
}
}
}
private class RunHandler implements HttpHandler {
public void handle(HttpExchange t) throws IOException {
if (!initialized) {
NiFiFnOpenWhiskAction.writeError(t, "Cannot invoke an uninitialized action.");
return;
}
try {
InputStream is = t.getRequestBody();
JsonParser parser = new JsonParser();
JsonObject body = parser.parse(new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))).getAsJsonObject();
JsonObject inputObject = body.getAsJsonObject("value");
/*
input body :
{
"activation_id":"e212d293aa73479d92d293aa73c79dc9",
"action_name":"/guest/nififn",
"deadline":"1541729057462",
"api_key":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502:123zO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP",
"value":{"registry":"http://172.26.224.116:61080","SourceCluster":"hdfs://172.26.224.119:8020","SourceFile":"test.txt","SourceDirectory":"hdfs://172.26.224.119:8020/tmp/nififn/input/","flow":"6cf8277a-c402-4957-8623-0fa9890dd45d","bucket":"e53b8a0d-5c85-4fcd-912a-1c549a586c83","DestinationDirectory":"hdfs://172.26.224.119:8020/tmp/nififn/output"},
"namespace":"guest"
}
input headers:
Accept-encoding: [gzip,deflate]
Accept: [application/json]
Connection: [Keep-Alive]
Host: [10.0.0.166:8080]
User-agent: [Apache-HttpClient/4.5.5 (Java/1.8.0-adoptopenjdk)]
Content-type: [application/json]
Content-length: [595]
*/
// Run Flow
Queue<FnFlowFile> output = new LinkedList<>();
boolean successful;
if(flow == null) {
FnFlow tempFlow = FnFlow.createAndEnqueueFromJSON(inputObject);
successful = tempFlow.runOnce(output);
} else {
flow.enqueueFromJSON(inputObject);
successful = flow.runOnce(output);
}
StringBuilder response = new StringBuilder();
for(FnFlowFile file : output)
response.append("\n").append(file);
NiFiFnOpenWhiskAction.writeResponse(t, successful ? 200 : 400, response.toString());
} catch (Exception e) {
e.printStackTrace(System.err);
NiFiFnOpenWhiskAction.writeError(t, "An error has occurred (see logs for details): " + e);
} finally {
writeLogMarkers();
}
}
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.runtimes;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.nifi.fn.core.FnFlow;
import org.apache.nifi.fn.core.FnFlowFile;
import org.apache.nifi.fn.runtimes.OpenWhisk.NiFiFnOpenWhiskAction;
import org.apache.nifi.fn.runtimes.YARN.YARNServiceUtil;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.reporting.InitializationException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
public class Program {
public static final String RUN_FROM_REGISTRY = "RunFromRegistry";
public static final String RUN_YARN_SERVICE_FROM_REGISTRY = "RunYARNServiceFromRegistry";
public static final String RUN_OPENWHISK_ACTION_SERVER = "RunOpenwhiskActionServer";
public static void main(String[] args) throws InvocationTargetException, IllegalAccessException, ProcessorInstantiationException, NiFiRegistryException, IOException, InterruptedException, InitializationException {
if(args.length == 0) {
printUsage();
System.exit(1);
} else if(args[0].equals(RUN_FROM_REGISTRY) && (args[1].equals("Once") || args[1].equals("Continuous")) && args.length >= 4) {
boolean once = args[1].equals("Once");
FnFlow flow;
if(args[2].equals("--file")){
String json = new String(Files.readAllBytes(Paths.get(args[3])));
JsonObject config = new JsonParser().parse(json).getAsJsonObject();
flow = FnFlow.createAndEnqueueFromJSON(config);
} else if (args[2].equals("--json")){
JsonObject config = new JsonParser().parse(args[3]).getAsJsonObject();
flow = FnFlow.createAndEnqueueFromJSON(config);
} else if(args.length >= 5) {
//Initialize flow
String registryUrl = args[2];
String bucketID = args[3];
String flowID = args[4];
Map<VariableDescriptor, String> inputVariables = new HashMap<>();
String[] variables = args[5].split(";");
for (String v : variables) {
String[] tokens = v.split("-");
inputVariables.put(new VariableDescriptor(tokens[0]), tokens[1]);
}
String[] failureOutputPorts = args.length >= 7 ? args[6].split(";") : new String[]{};
flow = new FnFlow(registryUrl, bucketID, flowID, () -> inputVariables, Arrays.asList(failureOutputPorts), true);
//Enqueue all provided flow files
if (7 < args.length) {
int i = 7;
while (i < args.length) {
Map<String, String> attributes = new HashMap<>();
byte[] content = {};
String[] attributesArr = args[i].split(";");
for (String v : attributesArr) {
String[] tokens = v.split("-");
if (tokens[0].equals(FnFlow.CONTENT))
content = tokens[1].getBytes();
else
attributes.put(tokens[0], tokens[1]);
}
flow.enqueueFlowFile(content, attributes);
i++;
}
}
} else {
System.out.println("Invalid input: "+String.join(",",args));
printUsage();
System.exit(1);
return;
}
//Run Flow
Queue<FnFlowFile> outputFlowFiles = new LinkedList<>();
//run flow once or forever
boolean successful;
if (once)
successful = flow.runOnce(outputFlowFiles);
else
successful = flow.run(outputFlowFiles);//Run forever
if(!successful) {
System.out.println("Flow Failed");
outputFlowFiles.forEach(f->System.out.println(f.toStringFull()));
System.exit(1);
} else {
System.out.println("Flow Succeeded");
outputFlowFiles.forEach(f->System.out.println(f.toStringFull()));
}
}else if(args[0].equals(RUN_YARN_SERVICE_FROM_REGISTRY) && args.length >= 7){
String YARNUrl = args[1];
String imageName = args[2];
String serviceName = args[3];
int numberOfContainers = Integer.parseInt(args[4]);
List<String> launchCommand = Arrays.asList(RUN_FROM_REGISTRY,"Continuous");
if(args[5].equals("--file")){
launchCommand.add("--json");
launchCommand.add(new String(Files.readAllBytes(Paths.get(args[6]))));
} else if (args[5].equals("--json")){
launchCommand.add("--json");
launchCommand.add(args[6]);
} if(args.length >= 9) {
for(int i = 5; i < args.length; i++){
launchCommand.add(args[i]);
}
} else {
System.out.println("Invalid input: "+String.join(",",args));
printUsage();
System.exit(1);
}
StringBuilder message = new StringBuilder();
YARNServiceUtil yarnServiceUtil = new YARNServiceUtil(YARNUrl, imageName);
yarnServiceUtil.launchYARNService(serviceName, numberOfContainers, launchCommand.toArray(new String[0]), message);
System.out.println(message);
} else if (args[0].equals(RUN_OPENWHISK_ACTION_SERVER) && args.length == 2){
NiFiFnOpenWhiskAction action = new NiFiFnOpenWhiskAction(Integer.parseInt(args[1]));
action.start();
}else{
System.out.println("Invalid input: "+String.join(",",args));
printUsage();
System.exit(1);
}
}
private static void printUsage(){
System.out.println("Usage:");
System.out.println(" 1) "+RUN_FROM_REGISTRY+" [Once|Continuous] <NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]");
System.out.println(" "+RUN_FROM_REGISTRY+" [Once|Continuous] --json <JSON>");
System.out.println(" "+RUN_FROM_REGISTRY+" [Once|Continuous] --file <File Name>");
System.out.println();
System.out.println(" 2) "+RUN_YARN_SERVICE_FROM_REGISTRY+" <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> \\");
System.out.println(" <NiFi registry URL> <Bucket ID> <Flow ID> <Input Variables> [<Failure Output Ports>] [<Input FlowFile>]");
System.out.println(" "+RUN_YARN_SERVICE_FROM_REGISTRY+" <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --json <JSON>");
System.out.println(" "+RUN_YARN_SERVICE_FROM_REGISTRY+" <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>");
System.out.println();
System.out.println(" 3) "+RUN_OPENWHISK_ACTION_SERVER+" <Port>");
System.out.println();
System.out.println("Examples:");
System.out.println(" 1) "+RUN_FROM_REGISTRY+" Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \\");
System.out.println(" \"DestinationDirectory-/tmp/nififn/output2/\" \"\" \"absolute.path-/tmp/nififn/input/;filename-test.txt\" \"absolute.path-/tmp/nififn/input/;filename-test2.txt\"");
System.out.println(" 2) "+RUN_FROM_REGISTRY+" Once http://172.0.0.1:61080 e53b8a0d-5c85-4fcd-912a-1c549a586c83 6cf8277a-c402-4957-8623-0fa9890dd45d \\");
System.out.println(" \"DestinationDirectory-/tmp/nififn/output2/\" \"f25c9204-6c95-3aa9-b0a8-c556f5f61849\" \"absolute.path-/tmp/nififn/input/;filename-test.txt\"");
System.out.println(" 3) "+RUN_YARN_SERVICE_FROM_REGISTRY+" http://127.0.0.1:8088 nifi-fn:latest kafka-to-solr 3 --file kafka-to-solr.json");
System.out.println(" 4) "+RUN_OPENWHISK_ACTION_SERVER+" 8080");
System.out.println();
System.out.println("Notes:");
System.out.println(" 1) <Input Variables> will be split on ';' and '-' then injected into the flow using the variable registry interface.");
System.out.println(" 2) <Failure Output Ports> will be split on ';'. FlowFiles routed to matching output ports will immediately fail the flow.");
System.out.println(" 3) <Input FlowFile> will be split on ';' and '-' then injected into the flow using the \""+FnFlow.CONTENT+"\" field as the FlowFile content.");
System.out.println(" 4) Multiple <Input FlowFile> arguments can be provided.");
System.out.println(" 5) The configuration file must be in JSON format. ");
System.out.println(" 6) When providing configurations via JSON, the following attributes must be provided: "+ FnFlow.REGISTRY+", "+ FnFlow.BUCKETID+", "+ FnFlow.FLOWID+".");
System.out.println(" All other attributes will be passed to the flow using the variable registry interface");
System.out.println();
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.runtimes.YARN;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class YARNServiceUtil {
private final String YARNUrl;
private final String imageName;
public YARNServiceUtil(String YARNUrl, String imageName){
this.YARNUrl = YARNUrl;
this.imageName = imageName;
}
public boolean launchYARNService(String name, int containerCount, String[] launchCommand, StringBuilder outMessage){
JsonObject spec = new JsonObject();
spec.addProperty("name", name.substring(0,25));
spec.addProperty("version", "1.0.0");
spec.addProperty("description", "NiFi-Fn service launched with the following command: "+String.join(",",launchCommand));
JsonObject component = new JsonObject();
component.addProperty("name","mc");
component.addProperty("number_of_containers",containerCount);
JsonObject artifact = new JsonObject();
artifact.addProperty("id",this.imageName);
artifact.addProperty("type","DOCKER");
component.add("artifact",artifact);
component.addProperty("launch_command",String.join(",",launchCommand));
JsonObject resource = new JsonObject();
resource.addProperty("cpus",1);
resource.addProperty("memory","256");
component.add("resource",resource);
JsonObject env = new JsonObject();
env.addProperty("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE","true");
JsonObject configuration = new JsonObject();
configuration.add("env",env);
component.add("configuration",configuration);
JsonArray components = new JsonArray();
components.add(component);
spec.add("components", components);
HttpPost request = new HttpPost(
this.YARNUrl+"/app/v1/services?user.name="+System.getProperty("user.name")
);
try {
request.setEntity(new StringEntity(spec.toString(), " application/json", StandardCharsets.UTF_8.toString()));
HttpResponse response = HttpClientBuilder.create().build().execute(request);
outMessage.append(new BasicResponseHandler().handleResponse(response));
return true;
} catch (IOException e) {
e.printStackTrace();
outMessage.append(e.getMessage());
return false;
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.processors.standard.GetFile;
import org.apache.nifi.processors.standard.PutFile;
import org.apache.nifi.processors.standard.ReplaceText;
import org.apache.nifi.processors.standard.SplitText;
import org.apache.nifi.registry.VariableRegistry;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class BatchTest {
@org.junit.Test
public void testScenario1_Test() throws Exception {
///////////////////////////////////////////
// Setup
///////////////////////////////////////////
VariableRegistry registry = VariableRegistry.EMPTY_REGISTRY;
boolean materializeData = true;
FnControllerServiceLookup serviceLookup = new FnControllerServiceLookup();
File file = new File("/tmp/nififn/input/test.txt");
file.getParentFile().mkdirs();
file.createNewFile();
try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
out.print("hello world");
}
///////////////////////////////////////////
// Build Flow
///////////////////////////////////////////
FnProcessorWrapper getFile = new FnProcessorWrapper(new GetFile(), null, serviceLookup, registry, materializeData);
getFile.setProperty(GetFile.DIRECTORY,"/tmp/nififn/input/");
getFile.setProperty(GetFile.FILE_FILTER,"test.txt");
getFile.setProperty(GetFile.KEEP_SOURCE_FILE,"true");
FnProcessorWrapper splitText = getFile.addChild(new SplitText(), GetFile.REL_SUCCESS);
splitText.setProperty(SplitText.LINE_SPLIT_COUNT,"1");
splitText.addAutoTermination(SplitText.REL_FAILURE);
splitText.addAutoTermination(SplitText.REL_ORIGINAL);
FnProcessorWrapper replaceText = splitText.addChild(new ReplaceText(), SplitText.REL_SPLITS);
replaceText.setProperty(ReplaceText.REPLACEMENT_VALUE,"$1!!!");
replaceText.addAutoTermination(ReplaceText.REL_FAILURE);
FnProcessorWrapper putFile = replaceText.addChild(new PutFile(), ReplaceText.REL_SUCCESS);
putFile.addAutoTermination(PutFile.REL_FAILURE);
putFile.addAutoTermination(PutFile.REL_SUCCESS);
putFile.setProperty(PutFile.DIRECTORY,"/tmp/nififn/output");
putFile.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
///////////////////////////////////////////
// Run Flow
///////////////////////////////////////////
FnFlow flow = new FnFlow(getFile);
Queue<FnFlowFile> output = new LinkedList<>();
boolean successful = flow.runOnce(output);
///////////////////////////////////////////
// Validate
///////////////////////////////////////////
String outputFile = "/tmp/nififn/output/test.txt";
assertTrue(new File(outputFile).isFile());
List<String> lines = Files.readAllLines(Paths.get(outputFile), StandardCharsets.UTF_8);
assertTrue(successful);
assertTrue(output.isEmpty());
assertEquals(1,lines.size());
assertEquals("hello world!!!", lines.get(0));
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
public class RegistryTest {
@org.junit.Test
public void testScenario1_Test() throws Exception {
/*VariableRegistry variableRegistry = () -> {
Map<VariableDescriptor,String> map = new HashMap<>();
map.put(new VariableDescriptor("SourceCluster"),"hdfs://172.16.96.132:8020");
map.put(new VariableDescriptor("SourceDirectory"),"hdfs://172.16.96.132:8020/tmp/nififn/input");
map.put(new VariableDescriptor("SourceFile"),"test.txt");
map.put(new VariableDescriptor("DestinationDirectory"),"hdfs://172.16.96.132:8020/tmp/nififn/output");
return map;
};
FnFlow flow = new FnFlow("http://172.16.96.132:61080","bucket1","HDFS_to_HDFS", variableRegistry);
flow.runOnce();
String outputFile = "/tmp/nififn/output2/test2.txt";
assertTrue(new File(outputFile).isFile());
List<String> lines = Files.readAllLines(Paths.get(outputFile), StandardCharsets.UTF_8);
assertEquals(1,lines.size());
assertEquals("hello world2", lines.get(0));*/
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.fn.core;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.PutFile;
import org.apache.nifi.processors.standard.ReplaceText;
import org.apache.nifi.processors.standard.SplitText;
import org.apache.nifi.processors.standard.TailFile;
import org.apache.nifi.registry.VariableRegistry;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StreamingTest {
@org.junit.Test
public void Scenario1_Test() throws InvocationTargetException, IllegalAccessException, IOException, InterruptedException {
///////////////////////////////////////////
// Setup
///////////////////////////////////////////
VariableRegistry registry = VariableRegistry.EMPTY_REGISTRY;
boolean materializeData = true;
FnControllerServiceLookup serviceLookup = new FnControllerServiceLookup();
File file = new File("/tmp/nififn/input/test.txt");
file.getParentFile().mkdirs();
file.createNewFile();
try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
out.print("hello world");
}
///////////////////////////////////////////
// Build Flow
///////////////////////////////////////////
FnProcessorWrapper tailFile = new FnProcessorWrapper(new TailFile(), null, serviceLookup, registry, materializeData);
tailFile.setProperty("File to Tail","/tmp/nififn/input/test.txt");
Set<Relationship> relationships = tailFile.getProcessor().getRelationships();
Relationship tailFile_Success = relationships.stream().filter(r->r.getName().equals("success")).findFirst().get();
FnProcessorWrapper splitText = tailFile.addChild(new SplitText(), tailFile_Success);
splitText.setProperty(SplitText.LINE_SPLIT_COUNT,"1");
splitText.addAutoTermination(SplitText.REL_FAILURE);
splitText.addAutoTermination(SplitText.REL_ORIGINAL);
FnProcessorWrapper replaceText = splitText.addChild(new ReplaceText(), SplitText.REL_SPLITS);
replaceText.setProperty(ReplaceText.REPLACEMENT_VALUE,"$1!!!");
replaceText.addAutoTermination(ReplaceText.REL_FAILURE);
FnProcessorWrapper putFile = replaceText.addChild(new PutFile(), ReplaceText.REL_SUCCESS);
putFile.addAutoTermination(PutFile.REL_FAILURE);
putFile.addAutoTermination(PutFile.REL_SUCCESS);
putFile.setProperty(PutFile.DIRECTORY,"/tmp/nififn/output/");
putFile.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
///////////////////////////////////////////
// Run Flow
///////////////////////////////////////////
FnFlow flow = new FnFlow(tailFile);
Queue<FnFlowFile> output = new LinkedList<>();
AtomicBoolean successful = new AtomicBoolean(true);
Thread t = new Thread(()->
successful.set(flow.run(output))
);
Thread.sleep(5000);
///////////////////////////////////////////
// Validate
///////////////////////////////////////////
String outputFile = "/tmp/nififn/output/test.txt";
assertTrue(new File(outputFile).isFile());
List<String> lines = Files.readAllLines(Paths.get(outputFile), StandardCharsets.UTF_8);
assertEquals(1,lines.size());
System.out.println("data: "+lines.get(0));
assertEquals("hello world!!!", lines.get(0));
System.out.println("Stopping...");
flow.shutdown();
t.join();
assertTrue(new File(outputFile).isFile());
lines = Files.readAllLines(Paths.get(outputFile), StandardCharsets.UTF_8);
assertTrue(successful.get());
assertTrue(output.isEmpty());
assertEquals(1,lines.size());
assertEquals("hello world!!!", lines.get(0));
}
}