NIFI-1858 Adding site-to-site reporting bundle

This closes #436.
This commit is contained in:
Bryan Bende 2016-05-12 09:59:27 -04:00
parent 8bb56fca68
commit 17c5496c04
11 changed files with 927 additions and 1 deletions

View File

@ -312,6 +312,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-hive-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,40 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-task</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,15 @@
nifi-site-to-site-reporting-nar
Copyright 2015-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)

View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site-reporting-task</artifactId>
<description>Publishes NiFi metrics and provenance events via S2S</description>
<version>1.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-data-provenance-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Base class for ReportingTasks that send data over site-to-site.
*/
public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
.description("The URL of the destination NiFi instance to send the Provenance Events to, " +
"should be in the format http(s)://host:port/nifi.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(new NiFiUrlValidator())
.build();
static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder()
.name("Input Port Name")
.description("The name of the Input Port to delivery Provenance Events to.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder()
.name("Instance URL")
.description("The URL of this instance to use in the Content URI of each event.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("http://${hostname(true)}:8080/nifi")
.addValidator(new NiFiUrlValidator())
.build();
static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
.name("Compress Events")
.description("Indicates whether or not to compress the events when being sent.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction")
.required(true)
.defaultValue("30 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Specifies how many records to send in a single batch, at most.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
protected volatile SiteToSiteClient siteToSiteClient;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DESTINATION_URL);
properties.add(PORT_NAME);
properties.add(SSL_CONTEXT);
properties.add(INSTANCE_URL);
properties.add(COMPRESS);
properties.add(TIMEOUT);
properties.add(BATCH_SIZE);
return properties;
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
final EventReporter eventReporter = new EventReporter() {
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
switch (severity) {
case WARNING:
getLogger().warn(message);
break;
case ERROR:
getLogger().error(message);
break;
default:
break;
}
}
};
final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
siteToSiteClient = new SiteToSiteClient.Builder()
.url(destinationUrl)
.portName(context.getProperty(PORT_NAME).getValue())
.useCompression(context.getProperty(COMPRESS).asBoolean())
.eventReporter(eventReporter)
.sslContext(sslContext)
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.build();
}
@OnStopped
public void shutdown() throws IOException {
final SiteToSiteClient client = getClient();
if (client != null) {
client.close();
}
}
// this getter is intended explicitly for testing purposes
protected SiteToSiteClient getClient() {
return this.siteToSiteClient;
}
static class NiFiUrlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
URL url;
try {
url = new URL(value);
} catch (final Exception e) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Not a valid URL")
.build();
}
if (url != null && !url.getPath().endsWith("/nifi")) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("URL path must be /nifi")
.build();
}
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(true)
.build();
}
}
}

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
private static final String LAST_EVENT_ID_KEY = "last_event_id";
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile long firstEventId = -1L;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
return properties;
}
private Map<String,String> createComponentMap(final ProcessGroupStatus status) {
final Map<String,String> componentMap = new HashMap<>();
if (status != null) {
componentMap.put(status.getId(), status.getName());
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
componentMap.put(procStatus.getId(), procStatus.getName());
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
componentMap.put(rpgStatus.getId(), rpgStatus.getName());
}
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
componentMap.put(childGroup.getId(), childGroup.getName());
}
}
return componentMap;
}
@Override
public void onTrigger(final ReportingContext context) {
final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
final Map<String,String> componentMap = createComponentMap(procGroupStatus);
Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId();
if(currMaxId == null) {
getLogger().debug("No events to send because no events have been created yet.");
return;
}
if (firstEventId < 0) {
Map<String, String> state;
try {
state = context.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e);
return;
}
if (state.containsKey(LAST_EVENT_ID_KEY)) {
firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
}
if(currMaxId < firstEventId){
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
"ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
firstEventId = -1;
}
}
if (currMaxId == (firstEventId - 1)) {
getLogger().debug("No events to send due to the current max id being equal to the last id that was queried.");
return;
}
List<ProvenanceEventRecord> events;
try {
events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
}
if (events == null || events.isEmpty()) {
getLogger().debug("No events to send due to 'events' being null or empty.");
return;
}
final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
URL url;
try {
url = new URL(nifiUrl);
} catch (final MalformedURLException e1) {
// already validated
throw new AssertionError();
}
final String hostname = url.getHost();
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
while (events != null && !events.isEmpty()) {
final long start = System.nanoTime();
// Create a JSON array of all the events in the current batch
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final ProvenanceEventRecord event : events) {
final String componentName = componentMap.get(event.getComponentId());
arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform));
}
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
try {
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
return;
}
final Map<String, String> attributes = new HashMap<>();
final String transactionId = UUID.randomUUID().toString();
attributes.put("reporting.task.transaction.id", transactionId);
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
transaction.send(data, attributes);
transaction.confirm();
transaction.complete();
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[]{events.size(), transferMillis, transactionId, events.get(0).getEventId()});
} catch (final IOException e) {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
}
// Store the id of the last event so we know where we left off
final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
final String lastEventId = String.valueOf(lastEvent.getEventId());
try {
StateManager stateManager = context.getStateManager();
Map<String, String> newMapOfState = new HashMap<>();
newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
stateManager.setState(newMapOfState, Scope.LOCAL);
} catch (final IOException ioe) {
getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
}
firstEventId = lastEvent.getEventId() + 1;
// Retrieve the next batch
try {
events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
}
}
}
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform) {
addField(builder, "eventId", UUID.randomUUID().toString());
addField(builder, "eventOrdinal", event.getEventId());
addField(builder, "eventType", event.getEventType().name());
addField(builder, "timestampMillis", event.getEventTime());
addField(builder, "timestamp", df.format(event.getEventTime()));
addField(builder, "durationMillis", event.getEventDuration());
addField(builder, "lineageStart", event.getLineageStartDate());
final Set<String> lineageIdentifiers = new HashSet<>();
if (event.getLineageIdentifiers() != null) {
lineageIdentifiers.addAll(event.getLineageIdentifiers());
}
lineageIdentifiers.add(event.getFlowFileUuid());
addField(builder, factory, "lineageIdentifiers", lineageIdentifiers);
addField(builder, "details", event.getDetails());
addField(builder, "componentId", event.getComponentId());
addField(builder, "componentType", event.getComponentType());
addField(builder, "componentName", componentName);
addField(builder, "entityId", event.getFlowFileUuid());
addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
addField(builder, "entitySize", event.getFileSize());
addField(builder, "previousEntitySize", event.getPreviousFileSize());
addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes());
addField(builder, factory, "previousAttributes", event.getPreviousAttributes());
addField(builder, "actorHostname", hostname);
if (nifiUrl != null) {
final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), "");
final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/";
addField(builder, "contentURI", contentUriBase + "output");
addField(builder, "previousContentURI", contentUriBase + "input");
}
addField(builder, factory, "parentIds", event.getParentUuids());
addField(builder, factory, "childIds", event.getChildUuids());
addField(builder, "transitUri", event.getTransitUri());
addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier());
addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri());
addField(builder, "platform", platform);
addField(builder, "application", applicationName);
return builder.build();
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) {
if (values == null) {
return;
}
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, String> entry : values.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
mapBuilder.add(entry.getKey(), entry.getValue());
}
builder.add(key, mapBuilder);
}
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
if (value != null) {
builder.add(key, value.longValue());
}
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
if (values == null) {
return;
}
builder.add(key, createJsonArray(factory, values));
}
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
if (value == null) {
return;
}
builder.add(key, value);
}
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
final JsonArrayBuilder builder = factory.createArrayBuilder();
for (final String value : values) {
if (value != null) {
builder.add(value);
}
}
return builder;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
public class TestSiteToSiteProvenanceReportingTask {
@Test
public void testSerializedForm() throws IOException, InitializationException {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");
final Set<String> lineageIdentifiers = new HashSet<>();
lineageIdentifiers.add("123");
lineageIdentifiers.add("321");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
builder.setLineageIdentifiers(lineageIdentifiers);
final ProvenanceEventRecord event = builder.build();
final List<byte[]> dataSent = new ArrayList<>();
final SiteToSiteProvenanceReportingTask task = new SiteToSiteProvenanceReportingTask() {
@SuppressWarnings("unchecked")
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final byte[] data = invocation.getArgumentAt(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
return client;
}
};
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
final ReportingContext context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getStateManager())
.thenReturn(new MockStateManager(task));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor), null);
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
final long maxEventId = 2500;
final AtomicInteger totalEvents = new AtomicInteger(0);
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
@Override
public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable {
final long startId = invocation.getArgumentAt(0, long.class);
final int maxRecords = invocation.getArgumentAt(1, int.class);
final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) {
eventsToReturn.add(event);
totalEvents.getAndIncrement();
}
return eventsToReturn;
}
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@Override
public Long answer(final InvocationOnMock invocation) throws Throwable {
return maxEventId;
}
}).when(provenanceRepository).getMaxEventId();
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
task.onTrigger(context);
assertEquals(3, dataSent.size());
final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes");
assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc"));
}
public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
MockFlowFile mockFlowFile = new MockFlowFile(id);
mockFlowFile.putAttributes(attributes);
return mockFlowFile;
}
}

View File

@ -0,0 +1,46 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-site-to-site-reporting-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-site-to-site-reporting-task</module>
<module>nifi-site-to-site-reporting-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-task</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.19</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -60,6 +60,7 @@
<module>nifi-cassandra-bundle</module>
<module>nifi-spring-bundle</module>
<module>nifi-hive-bundle</module>
<module>nifi-site-to-site-reporting-bundle</module>
</modules>
<dependencyManagement>
<dependencies>
@ -145,4 +146,4 @@
</dependency>
</dependencies>
</dependencyManagement>
</project>
</project>

View File

@ -1126,6 +1126,12 @@ language governing permissions and limitations under the License. -->
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-reporting-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>