mirror of https://github.com/apache/nifi.git
NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
Fixed dependency issue by providing a local JSON reader Rebased + fixed conflict + updated versions in pom + EL scope Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2575
This commit is contained in:
parent
ce0855e988
commit
6fbe1515ee
|
@ -29,21 +29,11 @@
|
||||||
<groupId>org.glassfish.jersey.core</groupId>
|
<groupId>org.glassfish.jersey.core</groupId>
|
||||||
<artifactId>jersey-client</artifactId>
|
<artifactId>jersey-client</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.glassfish</groupId>
|
|
||||||
<artifactId>javax.json</artifactId>
|
|
||||||
<version>1.0.4</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.json</groupId>
|
<groupId>javax.json</groupId>
|
||||||
<artifactId>javax.json-api</artifactId>
|
<artifactId>javax.json-api</artifactId>
|
||||||
<version>1.0</version>
|
<version>1.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>com.yammer.metrics</groupId>
|
|
||||||
<artifactId>metrics-core</artifactId>
|
|
||||||
<version>2.2.0</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-api</artifactId>
|
<artifactId>nifi-api</artifactId>
|
||||||
|
@ -53,6 +43,11 @@
|
||||||
<artifactId>nifi-utils</artifactId>
|
<artifactId>nifi-utils</artifactId>
|
||||||
<version>1.7.0-SNAPSHOT</version>
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-reporting-utils</artifactId>
|
||||||
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
<!-- test dependencies -->
|
<!-- test dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.reporting.AbstractReportingTask;
|
import org.apache.nifi.reporting.AbstractReportingTask;
|
||||||
import org.apache.nifi.reporting.ReportingContext;
|
import org.apache.nifi.reporting.ReportingContext;
|
||||||
import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
|
import org.apache.nifi.reporting.util.metrics.MetricsService;
|
||||||
import org.apache.nifi.reporting.ambari.metrics.MetricsService;
|
import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
|
||||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||||
|
|
||||||
import javax.json.Json;
|
import javax.json.Json;
|
||||||
|
|
|
@ -1,131 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.reporting.ambari.metrics;
|
|
||||||
|
|
||||||
import com.yammer.metrics.core.VirtualMachineMetrics;
|
|
||||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
|
||||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A service used to produce key/value metrics based on a given input.
|
|
||||||
*/
|
|
||||||
public class MetricsService {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generates a Map of metrics for a ProcessGroupStatus instance.
|
|
||||||
*
|
|
||||||
* @param status a ProcessGroupStatus to get metrics from
|
|
||||||
* @param appendPgId if true, the process group ID will be appended at the end of the metric name
|
|
||||||
* @return a map of metrics for the given status
|
|
||||||
*/
|
|
||||||
public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
|
|
||||||
final Map<String,String> metrics = new HashMap<>();
|
|
||||||
metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
|
|
||||||
metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
|
|
||||||
metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
|
|
||||||
metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
|
|
||||||
metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
|
|
||||||
metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
|
|
||||||
metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
|
|
||||||
metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
|
|
||||||
metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
|
|
||||||
|
|
||||||
final long durationNanos = calculateProcessingNanos(status);
|
|
||||||
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
|
|
||||||
|
|
||||||
final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
|
|
||||||
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
|
|
||||||
|
|
||||||
return metrics;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generates a Map of metrics for VirtualMachineMetrics.
|
|
||||||
*
|
|
||||||
* @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
|
|
||||||
* @return a map of metrics from the given VirtualMachineStatus
|
|
||||||
*/
|
|
||||||
public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
|
||||||
final Map<String,String> metrics = new HashMap<>();
|
|
||||||
metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
|
|
||||||
metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
|
|
||||||
metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
|
|
||||||
metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
|
|
||||||
metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
|
|
||||||
metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
|
|
||||||
metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
|
|
||||||
|
|
||||||
for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
|
|
||||||
final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
|
|
||||||
switch(entry.getKey()) {
|
|
||||||
case BLOCKED:
|
|
||||||
metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
|
|
||||||
break;
|
|
||||||
case RUNNABLE:
|
|
||||||
metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
|
|
||||||
break;
|
|
||||||
case TERMINATED:
|
|
||||||
metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
|
|
||||||
break;
|
|
||||||
case TIMED_WAITING:
|
|
||||||
metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
|
|
||||||
final String gcName = entry.getKey().replace(" ", "");
|
|
||||||
final long runs = entry.getValue().getRuns();
|
|
||||||
final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
|
|
||||||
metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
|
|
||||||
metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
|
|
||||||
}
|
|
||||||
|
|
||||||
return metrics;
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculates the total processing time of all processors in nanos
|
|
||||||
protected long calculateProcessingNanos(final ProcessGroupStatus status) {
|
|
||||||
long nanos = 0L;
|
|
||||||
|
|
||||||
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
|
|
||||||
nanos += procStats.getProcessingNanos();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
|
|
||||||
nanos += calculateProcessingNanos(childGroupStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
return nanos;
|
|
||||||
}
|
|
||||||
|
|
||||||
// append the process group ID if necessary
|
|
||||||
private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
|
|
||||||
if(appendPgId) {
|
|
||||||
return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
|
|
||||||
} else {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting.ambari.api;
|
package org.apache.nifi.reporting.ambari.api;
|
||||||
|
|
||||||
|
import org.apache.nifi.reporting.util.metrics.api.MetricFields;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics;
|
||||||
import com.yammer.metrics.core.VirtualMachineMetrics;
|
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.MetricNames;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.MetricsService;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,16 @@
|
||||||
<artifactId>commons-lang3</artifactId>
|
<artifactId>commons-lang3</artifactId>
|
||||||
<version>3.7</version>
|
<version>3.7</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.yammer.metrics</groupId>
|
||||||
|
<artifactId>metrics-core</artifactId>
|
||||||
|
<version>2.2.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.glassfish</groupId>
|
||||||
|
<artifactId>javax.json</artifactId>
|
||||||
|
<version>1.0.4</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting.ambari.metrics;
|
package org.apache.nifi.reporting.util.metrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Metric names to send to Ambari.
|
* The Metric names to send to Ambari.
|
||||||
|
@ -52,4 +52,8 @@ public interface MetricNames {
|
||||||
String JVM_GC_RUNS = "jvm.gc.runs";
|
String JVM_GC_RUNS = "jvm.gc.runs";
|
||||||
String JVM_GC_TIME = "jvm.gc.time";
|
String JVM_GC_TIME = "jvm.gc.time";
|
||||||
|
|
||||||
|
// OS Metrics
|
||||||
|
String LOAD1MN = "loadAverage1min";
|
||||||
|
String CORES = "availableCores";
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,230 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.reporting.util.metrics;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.json.JsonBuilderFactory;
|
||||||
|
import javax.json.JsonObject;
|
||||||
|
import javax.json.JsonObjectBuilder;
|
||||||
|
|
||||||
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
|
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.api.MetricFields;
|
||||||
|
|
||||||
|
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A service used to produce key/value metrics based on a given input.
|
||||||
|
*/
|
||||||
|
public class MetricsService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a Map of metrics for a ProcessGroupStatus instance.
|
||||||
|
*
|
||||||
|
* @param status a ProcessGroupStatus to get metrics from
|
||||||
|
* @param appendPgId if true, the process group ID will be appended at the end of the metric name
|
||||||
|
* @return a map of metrics for the given status
|
||||||
|
*/
|
||||||
|
public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
|
||||||
|
final Map<String,String> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
Map<String,Long> longMetrics = getLongMetrics(status, appendPgId);
|
||||||
|
for (String key : longMetrics.keySet()) {
|
||||||
|
metrics.put(key, String.valueOf(longMetrics.get(key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId);
|
||||||
|
for (String key : integerMetrics.keySet()) {
|
||||||
|
metrics.put(key, String.valueOf(integerMetrics.get(key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) {
|
||||||
|
final Map<String,Integer> metrics = new HashMap<>();
|
||||||
|
metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived());
|
||||||
|
metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent());
|
||||||
|
metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount());
|
||||||
|
metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount());
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) {
|
||||||
|
final Map<String,Long> metrics = new HashMap<>();
|
||||||
|
metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived());
|
||||||
|
metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent());
|
||||||
|
metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize());
|
||||||
|
metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead());
|
||||||
|
metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten());
|
||||||
|
|
||||||
|
final long durationNanos = calculateProcessingNanos(status);
|
||||||
|
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos);
|
||||||
|
|
||||||
|
final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
|
||||||
|
metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds);
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a Map of metrics for VirtualMachineMetrics.
|
||||||
|
*
|
||||||
|
* @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
|
||||||
|
* @return a map of metrics from the given VirtualMachineStatus
|
||||||
|
*/
|
||||||
|
public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
||||||
|
final Map<String,String> metrics = new HashMap<>();
|
||||||
|
|
||||||
|
Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : integerMetrics.keySet()) {
|
||||||
|
metrics.put(key, String.valueOf(integerMetrics.get(key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : longMetrics.keySet()) {
|
||||||
|
metrics.put(key, String.valueOf(longMetrics.get(key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : doubleMetrics.keySet()) {
|
||||||
|
metrics.put(key, String.valueOf(doubleMetrics.get(key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculates the total processing time of all processors in nanos
|
||||||
|
protected long calculateProcessingNanos(final ProcessGroupStatus status) {
|
||||||
|
long nanos = 0L;
|
||||||
|
|
||||||
|
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
|
||||||
|
nanos += procStats.getProcessingNanos();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
|
||||||
|
nanos += calculateProcessingNanos(childGroupStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
return nanos;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append the process group ID if necessary
|
||||||
|
private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
|
||||||
|
if(appendPgId) {
|
||||||
|
return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
|
||||||
|
} else {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
||||||
|
final Map<String,Double> metrics = new HashMap<>();
|
||||||
|
metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed());
|
||||||
|
metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage());
|
||||||
|
metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage());
|
||||||
|
metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage());
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
||||||
|
final Map<String,Long> metrics = new HashMap<>();
|
||||||
|
metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime());
|
||||||
|
|
||||||
|
for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
|
||||||
|
final String gcName = entry.getKey().replace(" ", "");
|
||||||
|
final long runs = entry.getValue().getRuns();
|
||||||
|
final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
|
||||||
|
metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs);
|
||||||
|
metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
||||||
|
final Map<String,Integer> metrics = new HashMap<>();
|
||||||
|
metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
|
||||||
|
metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount());
|
||||||
|
|
||||||
|
for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
|
||||||
|
final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
|
||||||
|
switch(entry.getKey()) {
|
||||||
|
case BLOCKED:
|
||||||
|
metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue);
|
||||||
|
break;
|
||||||
|
case RUNNABLE:
|
||||||
|
metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue);
|
||||||
|
break;
|
||||||
|
case TERMINATED:
|
||||||
|
metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue);
|
||||||
|
break;
|
||||||
|
case TIMED_WAITING:
|
||||||
|
metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics,
|
||||||
|
String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) {
|
||||||
|
JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
|
||||||
|
.add(MetricFields.APP_ID, applicationId)
|
||||||
|
.add(MetricFields.HOSTNAME, hostname)
|
||||||
|
.add(MetricFields.INSTANCE_ID, status.getId())
|
||||||
|
.add(MetricFields.TIMESTAMP, currentTimeMillis);
|
||||||
|
|
||||||
|
objectBuilder
|
||||||
|
.add(MetricNames.CORES, availableProcessors)
|
||||||
|
.add(MetricNames.LOAD1MN, systemLoad);
|
||||||
|
|
||||||
|
Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : integerMetrics.keySet()) {
|
||||||
|
objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : longMetrics.keySet()) {
|
||||||
|
objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
|
||||||
|
for (String key : doubleMetrics.keySet()) {
|
||||||
|
objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Long> longPgMetrics = getLongMetrics(status, false);
|
||||||
|
for (String key : longPgMetrics.keySet()) {
|
||||||
|
objectBuilder.add(key, longPgMetrics.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false);
|
||||||
|
for (String key : integerPgMetrics.keySet()) {
|
||||||
|
objectBuilder.add(key, integerPgMetrics.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
return objectBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting.ambari.api;
|
package org.apache.nifi.reporting.util.metrics.api;
|
||||||
|
|
||||||
import javax.json.JsonBuilderFactory;
|
import javax.json.JsonBuilderFactory;
|
||||||
import javax.json.JsonObject;
|
import javax.json.JsonObject;
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting.ambari.api;
|
package org.apache.nifi.reporting.util.metrics.api;
|
||||||
|
|
||||||
public interface MetricFields {
|
public interface MetricFields {
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting.ambari.api;
|
package org.apache.nifi.reporting.util.metrics.api;
|
||||||
|
|
||||||
import javax.json.JsonArrayBuilder;
|
import javax.json.JsonArrayBuilder;
|
||||||
import javax.json.JsonBuilderFactory;
|
import javax.json.JsonBuilderFactory;
|
|
@ -54,6 +54,23 @@
|
||||||
<artifactId>nifi-site-to-site-client</artifactId>
|
<artifactId>nifi-site-to-site-client</artifactId>
|
||||||
<version>1.7.0-SNAPSHOT</version>
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-record</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-avro-record-utils</artifactId>
|
||||||
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.glassfish</groupId>
|
<groupId>org.glassfish</groupId>
|
||||||
<artifactId>javax.json</artifactId>
|
<artifactId>javax.json</artifactId>
|
||||||
|
@ -82,11 +99,31 @@
|
||||||
<version>1.7.0-SNAPSHOT</version>
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock-record-utils</artifactId>
|
||||||
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<version>4.12</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes combine.children="append">
|
||||||
|
<exclude>src/main/resources/schema-metrics.avsc</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -16,6 +16,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.reporting;
|
package org.apache.nifi.reporting;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import javax.json.JsonObjectBuilder;
|
||||||
|
import javax.json.JsonValue;
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
@ -25,27 +42,51 @@ import org.apache.nifi.components.Validator;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.events.EventReporter;
|
import org.apache.nifi.events.EventReporter;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||||
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
||||||
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
|
||||||
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
|
import org.apache.nifi.serialization.WriteResult;
|
||||||
|
import org.apache.nifi.serialization.record.DataType;
|
||||||
|
import org.apache.nifi.serialization.record.MapRecord;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.SerializedForm;
|
||||||
|
import org.apache.nifi.serialization.record.type.ArrayDataType;
|
||||||
|
import org.apache.nifi.serialization.record.type.MapDataType;
|
||||||
|
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||||
|
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.util.StringUtils;
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
import org.codehaus.jackson.JsonFactory;
|
||||||
import javax.net.ssl.SSLContext;
|
import org.codehaus.jackson.JsonNode;
|
||||||
import java.io.IOException;
|
import org.codehaus.jackson.JsonParseException;
|
||||||
import java.util.ArrayList;
|
import org.codehaus.jackson.JsonParser;
|
||||||
import java.util.List;
|
import org.codehaus.jackson.JsonToken;
|
||||||
import java.util.concurrent.TimeUnit;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.node.ArrayNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for ReportingTasks that send data over site-to-site.
|
* Base class for ReportingTasks that send data over site-to-site.
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
|
public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
|
||||||
|
|
||||||
|
protected static final String LAST_EVENT_ID_KEY = "last_event_id";
|
||||||
protected static final String DESTINATION_URL_PATH = "/nifi";
|
protected static final String DESTINATION_URL_PATH = "/nifi";
|
||||||
|
protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
||||||
|
|
||||||
static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
|
||||||
.name("Destination URL")
|
.name("Destination URL")
|
||||||
|
@ -141,8 +182,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
|
||||||
.sensitive(true)
|
.sensitive(true)
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||||
|
.name("record-writer")
|
||||||
|
.displayName("Record Writer")
|
||||||
|
.description("Specifies the Controller Service to use for writing out the records.")
|
||||||
|
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
protected volatile SiteToSiteClient siteToSiteClient;
|
protected volatile SiteToSiteClient siteToSiteClient;
|
||||||
|
protected volatile RecordSchema recordSchema;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -188,7 +237,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
|
||||||
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
|
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
|
||||||
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
|
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
|
||||||
: new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
|
: new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
|
||||||
context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
|
context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
|
||||||
|
|
||||||
siteToSiteClient = new SiteToSiteClient.Builder()
|
siteToSiteClient = new SiteToSiteClient.Builder()
|
||||||
.urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
|
.urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
|
||||||
|
@ -215,6 +264,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
|
||||||
return this.siteToSiteClient;
|
return this.siteToSiteClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
|
||||||
|
try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {
|
||||||
|
|
||||||
|
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||||
|
final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
|
||||||
|
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
|
||||||
|
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
|
||||||
|
writer.beginRecordSet();
|
||||||
|
|
||||||
|
Record record;
|
||||||
|
while ((record = reader.nextRecord()) != null) {
|
||||||
|
writer.write(record);
|
||||||
|
}
|
||||||
|
|
||||||
|
final WriteResult writeResult = writer.finishRecordSet();
|
||||||
|
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||||
|
attributes.putAll(writeResult.getAttributes());
|
||||||
|
}
|
||||||
|
|
||||||
|
return out.toByteArray();
|
||||||
|
} catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
|
||||||
|
throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class NiFiUrlValidator implements Validator {
|
static class NiFiUrlValidator implements Validator {
|
||||||
@Override
|
@Override
|
||||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||||
|
@ -236,4 +312,334 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void addField(final JsonObjectBuilder builder, final String key, final Long value) {
|
||||||
|
if (value != null) {
|
||||||
|
builder.add(key, value.longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
|
||||||
|
if (value != null) {
|
||||||
|
builder.add(key, value.intValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addField(final JsonObjectBuilder builder, final String key, final String value) {
|
||||||
|
if (value == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.add(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
|
||||||
|
if (value == null) {
|
||||||
|
if (allowNullValues) {
|
||||||
|
builder.add(key, JsonValue.NULL);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
builder.add(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class JsonRecordReader implements RecordReader {
|
||||||
|
|
||||||
|
private RecordSchema recordSchema;
|
||||||
|
private final JsonParser jsonParser;
|
||||||
|
private final boolean array;
|
||||||
|
private final JsonNode firstJsonNode;
|
||||||
|
private boolean firstObjectConsumed = false;
|
||||||
|
|
||||||
|
private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat());
|
||||||
|
private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat());
|
||||||
|
private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
|
||||||
|
|
||||||
|
public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
|
||||||
|
this.recordSchema = recordSchema;
|
||||||
|
try {
|
||||||
|
jsonParser = new JsonFactory().createJsonParser(in);
|
||||||
|
jsonParser.setCodec(new ObjectMapper());
|
||||||
|
JsonToken token = jsonParser.nextToken();
|
||||||
|
if (token == JsonToken.START_ARRAY) {
|
||||||
|
array = true;
|
||||||
|
token = jsonParser.nextToken();
|
||||||
|
} else {
|
||||||
|
array = false;
|
||||||
|
}
|
||||||
|
if (token == JsonToken.START_OBJECT) {
|
||||||
|
firstJsonNode = jsonParser.readValueAsTree();
|
||||||
|
} else {
|
||||||
|
firstJsonNode = null;
|
||||||
|
}
|
||||||
|
} catch (final JsonParseException e) {
|
||||||
|
throw new MalformedRecordException("Could not parse data as JSON", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
jsonParser.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
|
||||||
|
if (firstObjectConsumed && !array) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields);
|
||||||
|
} catch (final MalformedRecordException mre) {
|
||||||
|
throw mre;
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
throw ioe;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordSchema getSchema() throws MalformedRecordException {
|
||||||
|
return recordSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
|
||||||
|
if (!firstObjectConsumed) {
|
||||||
|
firstObjectConsumed = true;
|
||||||
|
return firstJsonNode;
|
||||||
|
}
|
||||||
|
while (true) {
|
||||||
|
final JsonToken token = jsonParser.nextToken();
|
||||||
|
if (token == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
switch (token) {
|
||||||
|
case END_OBJECT:
|
||||||
|
continue;
|
||||||
|
case START_OBJECT:
|
||||||
|
return jsonParser.readValueAsTree();
|
||||||
|
case END_ARRAY:
|
||||||
|
case START_ARRAY:
|
||||||
|
return null;
|
||||||
|
default:
|
||||||
|
throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
|
||||||
|
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
|
||||||
|
|
||||||
|
final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);
|
||||||
|
|
||||||
|
if (dropUnknown) {
|
||||||
|
for (final RecordField recordField : schema.getFields()) {
|
||||||
|
final JsonNode childNode = getChildNode(jsonNode, recordField);
|
||||||
|
if (childNode == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String fieldName = recordField.getFieldName();
|
||||||
|
final Object value;
|
||||||
|
|
||||||
|
if (coerceTypes) {
|
||||||
|
final DataType desiredType = recordField.getDataType();
|
||||||
|
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
|
||||||
|
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
|
||||||
|
} else {
|
||||||
|
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
|
||||||
|
}
|
||||||
|
|
||||||
|
values.put(fieldName, value);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
final Iterator<String> fieldNames = jsonNode.getFieldNames();
|
||||||
|
while (fieldNames.hasNext()) {
|
||||||
|
final String fieldName = fieldNames.next();
|
||||||
|
final JsonNode childNode = jsonNode.get(fieldName);
|
||||||
|
final RecordField recordField = schema.getField(fieldName).orElse(null);
|
||||||
|
final Object value;
|
||||||
|
|
||||||
|
if (coerceTypes && recordField != null) {
|
||||||
|
final DataType desiredType = recordField.getDataType();
|
||||||
|
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
|
||||||
|
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
|
||||||
|
} else {
|
||||||
|
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
|
||||||
|
}
|
||||||
|
|
||||||
|
values.put(fieldName, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Supplier<String> supplier = () -> jsonNode.toString();
|
||||||
|
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
|
||||||
|
}
|
||||||
|
|
||||||
|
private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
|
||||||
|
if (jsonNode.has(field.getFieldName())) {
|
||||||
|
return jsonNode.get(field.getFieldName());
|
||||||
|
}
|
||||||
|
for (final String alias : field.getAliases()) {
|
||||||
|
if (jsonNode.has(alias)) {
|
||||||
|
return jsonNode.get(alias);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
|
||||||
|
if (fieldNode == null || fieldNode.isNull()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (desiredType.getFieldType()) {
|
||||||
|
case BOOLEAN:
|
||||||
|
case BYTE:
|
||||||
|
case CHAR:
|
||||||
|
case DOUBLE:
|
||||||
|
case FLOAT:
|
||||||
|
case INT:
|
||||||
|
case BIGINT:
|
||||||
|
case LONG:
|
||||||
|
case SHORT:
|
||||||
|
case STRING:
|
||||||
|
case DATE:
|
||||||
|
case TIME:
|
||||||
|
case TIMESTAMP: {
|
||||||
|
final Object rawValue = getRawNodeValue(fieldNode, null);
|
||||||
|
final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName);
|
||||||
|
return converted;
|
||||||
|
}
|
||||||
|
case MAP: {
|
||||||
|
final DataType valueType = ((MapDataType) desiredType).getValueType();
|
||||||
|
|
||||||
|
final Map<String, Object> map = new HashMap<>();
|
||||||
|
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
|
||||||
|
while (fieldNameItr.hasNext()) {
|
||||||
|
final String childName = fieldNameItr.next();
|
||||||
|
final JsonNode childNode = fieldNode.get(childName);
|
||||||
|
final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
|
||||||
|
map.put(childName, childValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
case ARRAY: {
|
||||||
|
final ArrayNode arrayNode = (ArrayNode) fieldNode;
|
||||||
|
final int numElements = arrayNode.size();
|
||||||
|
final Object[] arrayElements = new Object[numElements];
|
||||||
|
int count = 0;
|
||||||
|
for (final JsonNode node : arrayNode) {
|
||||||
|
final DataType elementType = ((ArrayDataType) desiredType).getElementType();
|
||||||
|
final Object converted = convertField(node, fieldName, elementType, dropUnknown);
|
||||||
|
arrayElements[count++] = converted;
|
||||||
|
}
|
||||||
|
|
||||||
|
return arrayElements;
|
||||||
|
}
|
||||||
|
case RECORD: {
|
||||||
|
if (fieldNode.isObject()) {
|
||||||
|
RecordSchema childSchema;
|
||||||
|
if (desiredType instanceof RecordDataType) {
|
||||||
|
childSchema = ((RecordDataType) desiredType).getChildSchema();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (childSchema == null) {
|
||||||
|
final List<RecordField> fields = new ArrayList<>();
|
||||||
|
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
|
||||||
|
while (fieldNameItr.hasNext()) {
|
||||||
|
fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
|
||||||
|
}
|
||||||
|
|
||||||
|
childSchema = new SimpleRecordSchema(fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case CHOICE: {
|
||||||
|
return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
|
||||||
|
if (fieldNode == null || fieldNode.isNull()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isNumber()) {
|
||||||
|
return fieldNode.getNumberValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isBinary()) {
|
||||||
|
return fieldNode.getBinaryValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isBoolean()) {
|
||||||
|
return fieldNode.getBooleanValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isTextual()) {
|
||||||
|
return fieldNode.getTextValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isArray()) {
|
||||||
|
final ArrayNode arrayNode = (ArrayNode) fieldNode;
|
||||||
|
final int numElements = arrayNode.size();
|
||||||
|
final Object[] arrayElements = new Object[numElements];
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
final DataType elementDataType;
|
||||||
|
if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
|
||||||
|
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
|
||||||
|
elementDataType = arrayDataType.getElementType();
|
||||||
|
} else {
|
||||||
|
elementDataType = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final JsonNode node : arrayNode) {
|
||||||
|
final Object value = getRawNodeValue(node, elementDataType);
|
||||||
|
arrayElements[count++] = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return arrayElements;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fieldNode.isObject()) {
|
||||||
|
RecordSchema childSchema;
|
||||||
|
if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
|
||||||
|
final RecordDataType recordDataType = (RecordDataType) dataType;
|
||||||
|
childSchema = recordDataType.getChildSchema();
|
||||||
|
} else {
|
||||||
|
childSchema = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (childSchema == null) {
|
||||||
|
childSchema = new SimpleRecordSchema(Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Iterator<String> fieldNames = fieldNode.getFieldNames();
|
||||||
|
final Map<String, Object> childValues = new HashMap<>();
|
||||||
|
while (fieldNames.hasNext()) {
|
||||||
|
final String childFieldName = fieldNames.next();
|
||||||
|
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
|
||||||
|
childValues.put(childFieldName, childValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
final MapRecord record = new MapRecord(childSchema, childValues);
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||||
public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
|
public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
|
||||||
|
|
||||||
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
|
||||||
static final String LAST_EVENT_ID_KEY = "last_event_id";
|
|
||||||
|
|
||||||
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
|
||||||
.name("Platform")
|
.name("Platform")
|
||||||
.description("The value to use for the platform field in each provenance event.")
|
.description("The value to use for the platform field in each provenance event.")
|
||||||
|
@ -195,7 +192,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
|
||||||
lastSentBulletinId = currMaxId;
|
lastSentBulletinId = currMaxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
|
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
|
||||||
final String platform, final String nodeIdentifier) {
|
final String platform, final String nodeIdentifier) {
|
||||||
|
|
||||||
addField(builder, "objectId", UUID.randomUUID().toString());
|
addField(builder, "objectId", UUID.randomUUID().toString());
|
||||||
|
@ -216,17 +213,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
|
|
||||||
if (value != null) {
|
|
||||||
builder.add(key, value.longValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
|
|
||||||
if (value == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
builder.add(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,222 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.reporting;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.OperatingSystemMXBean;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.json.Json;
|
||||||
|
import javax.json.JsonBuilderFactory;
|
||||||
|
import javax.json.JsonObject;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.avro.AvroTypeUtil;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.remote.Transaction;
|
||||||
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.MetricNames;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.MetricsService;
|
||||||
|
import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
|
||||||
|
|
||||||
|
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||||
|
|
||||||
|
@Tags({"status", "metrics", "site", "site to site"})
|
||||||
|
@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
|
||||||
|
public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
|
||||||
|
|
||||||
|
static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
|
||||||
|
+ " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
|
||||||
|
static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
|
||||||
|
+ " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
|
||||||
|
+ " have the description of the default schema.");
|
||||||
|
|
||||||
|
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
|
||||||
|
.name("s2s-metrics-application-id")
|
||||||
|
.displayName("Application ID")
|
||||||
|
.description("The Application ID to be included in the metrics")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.defaultValue("nifi")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("s2s-metrics-hostname")
|
||||||
|
.displayName("Hostname")
|
||||||
|
.description("The Hostname of this NiFi instance to be included in the metrics")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.defaultValue("${hostname(true)}")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
|
||||||
|
.name("s2s-metrics-format")
|
||||||
|
.displayName("Output Format")
|
||||||
|
.description("The output format that will be used for the metrics. If " + RECORD_FORMAT.getDisplayName() + " is selected, "
|
||||||
|
+ "a Record Writer must be provided. If " + AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property "
|
||||||
|
+ "should be empty.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
|
||||||
|
.defaultValue(AMBARI_FORMAT.getValue())
|
||||||
|
.addValidator(Validator.VALID)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private final MetricsService metricsService = new MetricsService();
|
||||||
|
|
||||||
|
public SiteToSiteMetricsReportingTask() throws IOException {
|
||||||
|
final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
|
||||||
|
recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||||
|
properties.add(HOSTNAME);
|
||||||
|
properties.add(APPLICATION_ID);
|
||||||
|
properties.add(FORMAT);
|
||||||
|
properties.add(RECORD_WRITER);
|
||||||
|
properties.remove(BATCH_SIZE);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
|
||||||
|
|
||||||
|
final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet();
|
||||||
|
if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) {
|
||||||
|
problems.add(new ValidationResult.Builder()
|
||||||
|
.input("Record Writer")
|
||||||
|
.valid(false)
|
||||||
|
.explanation("If using " + RECORD_FORMAT.getDisplayName() + ", a record writer needs to be set.")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
if (validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue()) && isWriterSet) {
|
||||||
|
problems.add(new ValidationResult.Builder()
|
||||||
|
.input("Record Writer")
|
||||||
|
.valid(false)
|
||||||
|
.explanation("If using " + AMBARI_FORMAT.getDisplayName() + ", no record writer should be set.")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return problems;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ReportingContext context) {
|
||||||
|
final boolean isClustered = context.isClustered();
|
||||||
|
final String nodeId = context.getClusterNodeIdentifier();
|
||||||
|
if (nodeId == null && isClustered) {
|
||||||
|
getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
|
||||||
|
+ "Will wait for Node Identifier to be established.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
|
||||||
|
final Map<String, ?> config = Collections.emptyMap();
|
||||||
|
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
|
||||||
|
|
||||||
|
final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
|
||||||
|
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
|
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
|
||||||
|
|
||||||
|
if(status != null) {
|
||||||
|
final Map<String,String> statusMetrics = metricsService.getMetrics(status, false);
|
||||||
|
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
|
||||||
|
|
||||||
|
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
|
||||||
|
final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
|
||||||
|
final double systemLoad = os.getSystemLoadAverage();
|
||||||
|
|
||||||
|
byte[] data;
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
|
||||||
|
if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) {
|
||||||
|
final JsonObject metricsObject = metricsBuilder
|
||||||
|
.applicationId(applicationId)
|
||||||
|
.instanceId(status.getId())
|
||||||
|
.hostname(hostname)
|
||||||
|
.timestamp(System.currentTimeMillis())
|
||||||
|
.addAllMetrics(statusMetrics)
|
||||||
|
.addAllMetrics(jvmMetrics)
|
||||||
|
.metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors()))
|
||||||
|
.metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
data = metricsObject.toString().getBytes(StandardCharsets.UTF_8);
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||||
|
} else {
|
||||||
|
final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(),
|
||||||
|
hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1);
|
||||||
|
data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
|
||||||
|
if (transaction == null) {
|
||||||
|
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String transactionId = UUID.randomUUID().toString();
|
||||||
|
attributes.put("reporting.task.transaction.id", transactionId);
|
||||||
|
attributes.put("reporting.task.name", getName());
|
||||||
|
attributes.put("reporting.task.uuid", getIdentifier());
|
||||||
|
attributes.put("reporting.task.type", this.getClass().getSimpleName());
|
||||||
|
|
||||||
|
transaction.send(data, attributes);
|
||||||
|
transaction.confirm();
|
||||||
|
transaction.complete();
|
||||||
|
|
||||||
|
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||||
|
getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
|
||||||
|
} catch (final Exception e) {
|
||||||
|
throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
getLogger().error("No process group status to retrieve metrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -46,7 +46,6 @@ import javax.json.JsonArrayBuilder;
|
||||||
import javax.json.JsonBuilderFactory;
|
import javax.json.JsonBuilderFactory;
|
||||||
import javax.json.JsonObject;
|
import javax.json.JsonObject;
|
||||||
import javax.json.JsonObjectBuilder;
|
import javax.json.JsonObjectBuilder;
|
||||||
import javax.json.JsonValue;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -76,9 +75,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
)
|
)
|
||||||
public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
|
public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
|
||||||
|
|
||||||
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
|
||||||
static final String LAST_EVENT_ID_KEY = "last_event_id";
|
|
||||||
|
|
||||||
static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
|
static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
|
||||||
"Start reading provenance Events from the beginning of the stream (the oldest event first)");
|
"Start reading provenance Events from the beginning of the stream (the oldest event first)");
|
||||||
static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
|
static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
|
||||||
|
@ -307,7 +303,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
|
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
|
||||||
final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
|
final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
|
||||||
final String platform, final String nodeIdentifier) {
|
final String platform, final String nodeIdentifier) {
|
||||||
addField(builder, "eventId", UUID.randomUUID().toString());
|
addField(builder, "eventId", UUID.randomUUID().toString());
|
||||||
|
@ -371,13 +367,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
||||||
builder.add(key, mapBuilder);
|
builder.add(key, mapBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
|
private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
|
||||||
if (value != null) {
|
|
||||||
builder.add(key, value.longValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
|
|
||||||
if (values == null) {
|
if (values == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -385,20 +375,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
||||||
builder.add(key, createJsonArray(factory, values));
|
builder.add(key, createJsonArray(factory, values));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
|
|
||||||
addField(builder, key, value, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
|
|
||||||
if (value == null) {
|
|
||||||
if (allowNullValues) {
|
|
||||||
builder.add(key, JsonValue.NULL);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
builder.add(key, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
|
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
|
||||||
final JsonArrayBuilder builder = factory.createArrayBuilder();
|
final JsonArrayBuilder builder = factory.createArrayBuilder();
|
||||||
for (final String value : values) {
|
for (final String value : values) {
|
||||||
|
|
|
@ -61,8 +61,6 @@ import org.apache.nifi.remote.TransferDirection;
|
||||||
+ "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
|
+ "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
|
||||||
public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
|
public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
|
||||||
|
|
||||||
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
|
||||||
|
|
||||||
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
|
||||||
.name("Platform")
|
.name("Platform")
|
||||||
.description("The value to use for the platform field in each status record.")
|
.description("The value to use for the platform field in each status record.")
|
||||||
|
@ -71,6 +69,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
.defaultValue("nifi")
|
.defaultValue("nifi")
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder()
|
||||||
.name("Component Type Filter Regex")
|
.name("Component Type Filter Regex")
|
||||||
.description("A regex specifying which component types to report. Any component type matching this regex will be included. "
|
.description("A regex specifying which component types to report. Any component type matching this regex will be included. "
|
||||||
|
@ -80,6 +79,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
.defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
|
.defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
|
||||||
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
|
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder()
|
||||||
.name("Component Name Filter Regex")
|
.name("Component Name Filter Regex")
|
||||||
.description("A regex specifying which component names to report. Any component name matching this regex will be included.")
|
.description("A regex specifying which component names to report. Any component name matching this regex will be included.")
|
||||||
|
@ -198,7 +198,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
* The component name
|
* The component name
|
||||||
* @return Whether the component matches both filters
|
* @return Whether the component matches both filters
|
||||||
*/
|
*/
|
||||||
boolean componentMatchesFilters(final String componentType, final String componentName) {
|
private boolean componentMatchesFilters(final String componentType, final String componentName) {
|
||||||
return componentTypeFilter.matcher(componentType).matches()
|
return componentTypeFilter.matcher(componentType).matches()
|
||||||
&& componentNameFilter.matcher(componentName).matches();
|
&& componentNameFilter.matcher(componentName).matches();
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
* @param parentId
|
* @param parentId
|
||||||
* The parent's component id
|
* The parent's component id
|
||||||
*/
|
*/
|
||||||
void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
|
private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
|
||||||
final ProcessGroupStatus status, final DateFormat df,
|
final ProcessGroupStatus status, final DateFormat df,
|
||||||
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
||||||
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
||||||
|
@ -279,7 +279,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
|
private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
|
||||||
final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
|
final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
|
||||||
final String platform, final String parentId, final Date currentDate) {
|
final String platform, final String parentId, final Date currentDate) {
|
||||||
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
||||||
|
@ -304,7 +304,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
|
private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
|
||||||
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
||||||
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
||||||
final String componentName = status.getName();
|
final String componentName = status.getName();
|
||||||
|
@ -328,7 +328,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
|
private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
|
||||||
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
||||||
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
||||||
final String componentType = "Connection";
|
final String componentType = "Connection";
|
||||||
|
@ -356,7 +356,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
|
private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
|
||||||
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
|
||||||
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
final JsonObjectBuilder builder = factory.createObjectBuilder();
|
||||||
final String componentType = "Processor";
|
final String componentType = "Processor";
|
||||||
|
@ -387,7 +387,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
|
private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
|
||||||
final String applicationName, final String platform, final String parentId, final Date currentDate,
|
final String applicationName, final String platform, final String parentId, final Date currentDate,
|
||||||
final String componentType, final String componentName) {
|
final String componentType, final String componentName) {
|
||||||
addField(builder, "statusId", UUID.randomUUID().toString());
|
addField(builder, "statusId", UUID.randomUUID().toString());
|
||||||
|
@ -401,23 +401,4 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
|
||||||
addField(builder, "application", applicationName);
|
addField(builder, "application", applicationName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
|
|
||||||
if (value != null) {
|
|
||||||
builder.add(key, value.longValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
|
|
||||||
if (value != null) {
|
|
||||||
builder.add(key, value.intValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
|
|
||||||
if (value == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.add(key, value);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,3 +16,4 @@
|
||||||
org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
|
org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
|
||||||
org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
|
org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
|
||||||
org.apache.nifi.reporting.SiteToSiteStatusReportingTask
|
org.apache.nifi.reporting.SiteToSiteStatusReportingTask
|
||||||
|
org.apache.nifi.reporting.SiteToSiteMetricsReportingTask
|
|
@ -0,0 +1,178 @@
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>SiteToSiteMetricsReportingTask</title>
|
||||||
|
|
||||||
|
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
<p>
|
||||||
|
The Site-to-Site Metrics Reporting Task allows the user to publish NiFi's metrics (as in the Ambari reporting task) to the
|
||||||
|
same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
|
||||||
|
all of the different Processors that are available in NiFi in order to process or distribute that data.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<h2>Ambari format</h2>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
There are two available output formats. The first one is the Ambari format as defined in the Ambari Metrics Collector
|
||||||
|
API which is a JSON with dynamic keys. If using this format you might be interested by the below Jolt specification to
|
||||||
|
transform the data.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<pre>
|
||||||
|
<code>
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"operation": "shift",
|
||||||
|
"spec": {
|
||||||
|
"metrics": {
|
||||||
|
"*": {
|
||||||
|
"metrics": {
|
||||||
|
"*": {
|
||||||
|
"$": "metrics.[#4].metrics.time",
|
||||||
|
"@": "metrics.[#4].metrics.value"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"*": "metrics.[&1].&"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
</code>
|
||||||
|
</pre>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
This would transform the below sample:
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<pre>
|
||||||
|
<code>
|
||||||
|
{
|
||||||
|
"metrics": [{
|
||||||
|
"metricname": "jvm.gc.time.G1OldGeneration",
|
||||||
|
"appid": "nifi",
|
||||||
|
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"timestamp": "1520456854361",
|
||||||
|
"starttime": "1520456854361",
|
||||||
|
"metrics": {
|
||||||
|
"1520456854361": "0"
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
"metricname": "jvm.thread_states.terminated",
|
||||||
|
"appid": "nifi",
|
||||||
|
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"timestamp": "1520456854361",
|
||||||
|
"starttime": "1520456854361",
|
||||||
|
"metrics": {
|
||||||
|
"1520456854361": "0"
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
</code>
|
||||||
|
</pre>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
into:
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<pre>
|
||||||
|
<code>
|
||||||
|
{
|
||||||
|
"metrics": [{
|
||||||
|
"metricname": "jvm.gc.time.G1OldGeneration",
|
||||||
|
"appid": "nifi",
|
||||||
|
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"timestamp": "1520456854361",
|
||||||
|
"starttime": "1520456854361",
|
||||||
|
"metrics": {
|
||||||
|
"time": "1520456854361",
|
||||||
|
"value": "0"
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
"metricname": "jvm.thread_states.terminated",
|
||||||
|
"appid": "nifi",
|
||||||
|
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"timestamp": "1520456854361",
|
||||||
|
"starttime": "1520456854361",
|
||||||
|
"metrics": {
|
||||||
|
"time": "1520456854361",
|
||||||
|
"value": "0"
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
</code>
|
||||||
|
</pre>
|
||||||
|
|
||||||
|
<h2>Record format</h2>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
The second format is leveraging the record framework of NiFi so that the user can define a Record Writer and directly
|
||||||
|
specify the output format and data with the assumption that the input schema is the following:
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<pre>
|
||||||
|
<code>
|
||||||
|
{
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "metrics",
|
||||||
|
"namespace" : "metrics",
|
||||||
|
"fields" : [
|
||||||
|
{ "name" : "appid", "type" : "string" },
|
||||||
|
{ "name" : "instanceid", "type" : "string" },
|
||||||
|
{ "name" : "hostname", "type" : "string" },
|
||||||
|
{ "name" : "timestamp", "type" : "long" },
|
||||||
|
{ "name" : "loadAverage1min", "type" : "double" },
|
||||||
|
{ "name" : "availableCores", "type" : "int" },
|
||||||
|
{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
|
||||||
|
{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
|
||||||
|
{ "name" : "BytesSentLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "FlowFilesQueued", "type" : "int" },
|
||||||
|
{ "name" : "BytesQueued", "type" : "long" },
|
||||||
|
{ "name" : "BytesReadLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "ActiveThreads", "type" : "int" },
|
||||||
|
{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
|
||||||
|
{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
|
||||||
|
{ "name" : "jvmuptime", "type" : "long" },
|
||||||
|
{ "name" : "jvmheap_used", "type" : "double" },
|
||||||
|
{ "name" : "jvmheap_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmnon_heap_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_count", "type" : "int" },
|
||||||
|
{ "name" : "jvmdaemon_thread_count", "type" : "int" },
|
||||||
|
{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmgcruns", "type" : ["long", "null"] },
|
||||||
|
{ "name" : "jvmgctime", "type" : ["long", "null"] }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
</code>
|
||||||
|
</pre>
|
||||||
|
|
||||||
|
</body>
|
||||||
|
</html>
|
|
@ -25,7 +25,7 @@
|
||||||
<p>
|
<p>
|
||||||
The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to
|
The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to
|
||||||
the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
|
the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
|
||||||
all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is
|
all of the different Processors that are available in NiFi in order to process or distribute that data. When possible, it is
|
||||||
advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because
|
advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because
|
||||||
when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there
|
when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there
|
||||||
is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events
|
is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
{
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "metrics",
|
||||||
|
"namespace" : "metrics",
|
||||||
|
"fields" : [
|
||||||
|
{ "name" : "appid", "type" : "string" },
|
||||||
|
{ "name" : "instanceid", "type" : "string" },
|
||||||
|
{ "name" : "hostname", "type" : "string" },
|
||||||
|
{ "name" : "timestamp", "type" : "long" },
|
||||||
|
{ "name" : "loadAverage1min", "type" : "double" },
|
||||||
|
{ "name" : "availableCores", "type" : "int" },
|
||||||
|
{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
|
||||||
|
{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
|
||||||
|
{ "name" : "BytesSentLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "FlowFilesQueued", "type" : "int" },
|
||||||
|
{ "name" : "BytesQueued", "type" : "long" },
|
||||||
|
{ "name" : "BytesReadLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
|
||||||
|
{ "name" : "ActiveThreads", "type" : "int" },
|
||||||
|
{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
|
||||||
|
{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
|
||||||
|
{ "name" : "jvmuptime", "type" : "long" },
|
||||||
|
{ "name" : "jvmheap_used", "type" : "double" },
|
||||||
|
{ "name" : "jvmheap_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmnon_heap_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
|
||||||
|
{ "name" : "jvmthread_count", "type" : "int" },
|
||||||
|
{ "name" : "jvmdaemon_thread_count", "type" : "int" },
|
||||||
|
{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
|
||||||
|
{ "name" : "jvmgcruns", "type" : ["long", "null"] },
|
||||||
|
{ "name" : "jvmgctime", "type" : ["long", "null"] }
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,296 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.reporting;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import javax.json.Json;
|
||||||
|
import javax.json.JsonArray;
|
||||||
|
import javax.json.JsonObject;
|
||||||
|
import javax.json.JsonReader;
|
||||||
|
import javax.json.JsonValue;
|
||||||
|
|
||||||
|
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||||
|
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.remote.Transaction;
|
||||||
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||||
|
import org.apache.nifi.state.MockStateManager;
|
||||||
|
import org.apache.nifi.util.MockPropertyValue;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
public class TestSiteToSiteMetricsReportingTask {
|
||||||
|
|
||||||
|
private ReportingContext context;
|
||||||
|
private ProcessGroupStatus status;
|
||||||
|
private TestRunner runner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
status = new ProcessGroupStatus();
|
||||||
|
status.setId("1234");
|
||||||
|
status.setFlowFilesReceived(5);
|
||||||
|
status.setBytesReceived(10000);
|
||||||
|
status.setFlowFilesSent(10);
|
||||||
|
status.setBytesSent(20000);
|
||||||
|
status.setQueuedCount(100);
|
||||||
|
status.setQueuedContentSize(1024L);
|
||||||
|
status.setBytesRead(60000L);
|
||||||
|
status.setBytesWritten(80000L);
|
||||||
|
status.setActiveThreadCount(5);
|
||||||
|
|
||||||
|
// create a processor status with processing time
|
||||||
|
ProcessorStatus procStatus = new ProcessorStatus();
|
||||||
|
procStatus.setProcessingNanos(123456789);
|
||||||
|
|
||||||
|
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
|
||||||
|
processorStatuses.add(procStatus);
|
||||||
|
status.setProcessorStatus(processorStatuses);
|
||||||
|
|
||||||
|
// create a group status with processing time
|
||||||
|
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
|
||||||
|
groupStatus.setProcessorStatus(processorStatuses);
|
||||||
|
|
||||||
|
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
|
||||||
|
groupStatuses.add(groupStatus);
|
||||||
|
status.setProcessGroupStatus(groupStatuses);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockSiteToSiteMetricsReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
|
||||||
|
|
||||||
|
final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
|
||||||
|
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
|
||||||
|
properties.put(descriptor, descriptor.getDefaultValue());
|
||||||
|
}
|
||||||
|
properties.putAll(customProperties);
|
||||||
|
|
||||||
|
context = Mockito.mock(ReportingContext.class);
|
||||||
|
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
|
||||||
|
Mockito.doAnswer(new Answer<PropertyValue>() {
|
||||||
|
@Override
|
||||||
|
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
|
||||||
|
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
|
||||||
|
return new MockPropertyValue(properties.get(descriptor));
|
||||||
|
}
|
||||||
|
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
|
||||||
|
|
||||||
|
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
|
||||||
|
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
|
||||||
|
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
|
||||||
|
|
||||||
|
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
|
||||||
|
MockRecordWriter writer = new MockRecordWriter();
|
||||||
|
Mockito.when(context.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
|
||||||
|
Mockito.when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
|
||||||
|
|
||||||
|
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||||
|
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
|
||||||
|
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
|
||||||
|
Mockito.when(initContext.getLogger()).thenReturn(logger);
|
||||||
|
task.initialize(initContext);
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidationBothAmbariFormatRecordWriter() throws IOException {
|
||||||
|
ValidationContext validationContext = Mockito.mock(ValidationContext.class);
|
||||||
|
final String urlEL = "http://${hostname(true)}:8080/nifi";
|
||||||
|
final String url = "http://localhost:8080/nifi";
|
||||||
|
|
||||||
|
final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
|
||||||
|
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
|
||||||
|
properties.put(descriptor, descriptor.getDefaultValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
|
||||||
|
|
||||||
|
final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class);
|
||||||
|
Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(pValueUrl.getValue()).thenReturn(url);
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<PropertyValue>() {
|
||||||
|
@Override
|
||||||
|
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
|
||||||
|
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
|
||||||
|
return new MockPropertyValue(properties.get(descriptor));
|
||||||
|
}
|
||||||
|
}).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
|
||||||
|
|
||||||
|
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
|
||||||
|
Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
|
||||||
|
Mockito.when(pValue.isSet()).thenReturn(true);
|
||||||
|
|
||||||
|
// should be invalid because both ambari format and record writer are set
|
||||||
|
Collection<ValidationResult> list = task.validate(validationContext);
|
||||||
|
Assert.assertEquals(1, list.size());
|
||||||
|
Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidationRecordFormatNoRecordWriter() throws IOException {
|
||||||
|
ValidationContext validationContext = Mockito.mock(ValidationContext.class);
|
||||||
|
final String urlEL = "http://${hostname(true)}:8080/nifi";
|
||||||
|
final String url = "http://localhost:8080/nifi";
|
||||||
|
|
||||||
|
final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
|
||||||
|
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
|
||||||
|
properties.put(descriptor, descriptor.getDefaultValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
|
||||||
|
|
||||||
|
final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class);
|
||||||
|
Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
|
||||||
|
Mockito.when(pValueUrl.getValue()).thenReturn(url);
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<PropertyValue>() {
|
||||||
|
@Override
|
||||||
|
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
|
||||||
|
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
|
||||||
|
return new MockPropertyValue(properties.get(descriptor));
|
||||||
|
}
|
||||||
|
}).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
|
||||||
|
|
||||||
|
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
|
||||||
|
Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
|
||||||
|
Mockito.when(pValue.isSet()).thenReturn(false);
|
||||||
|
|
||||||
|
// should be invalid because both ambari format and record writer are set
|
||||||
|
Collection<ValidationResult> list = task.validate(validationContext);
|
||||||
|
Assert.assertEquals(1, list.size());
|
||||||
|
Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAmbariFormat() throws IOException, InitializationException {
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
|
||||||
|
|
||||||
|
MockSiteToSiteMetricsReportingTask task = initTask(properties);
|
||||||
|
task.onTrigger(context);
|
||||||
|
|
||||||
|
assertEquals(1, task.dataSent.size());
|
||||||
|
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
|
||||||
|
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
|
||||||
|
JsonArray array = jsonReader.readObject().getJsonArray("metrics");
|
||||||
|
for(int i = 0; i < array.size(); i++) {
|
||||||
|
JsonObject object = array.getJsonObject(i);
|
||||||
|
assertEquals("nifi", object.getString("appid"));
|
||||||
|
assertEquals("1234", object.getString("instanceid"));
|
||||||
|
if(object.getString("metricname").equals("FlowFilesQueued")) {
|
||||||
|
for(Entry<String, JsonValue> kv : object.getJsonObject("metrics").entrySet()) {
|
||||||
|
assertEquals("\"100\"", kv.getValue().toString());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordFormat() throws IOException, InitializationException {
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
|
||||||
|
properties.put(SiteToSiteMetricsReportingTask.RECORD_WRITER, "record-writer");
|
||||||
|
MockSiteToSiteMetricsReportingTask task = initTask(properties);
|
||||||
|
|
||||||
|
task.onTrigger(context);
|
||||||
|
|
||||||
|
assertEquals(1, task.dataSent.size());
|
||||||
|
String[] data = new String(task.dataSent.get(0)).split(",");
|
||||||
|
assertEquals("\"nifi\"", data[0]);
|
||||||
|
assertEquals("\"1234\"", data[1]);
|
||||||
|
assertEquals("\"100\"", data[10]); // FlowFilesQueued
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class MockSiteToSiteMetricsReportingTask extends SiteToSiteMetricsReportingTask {
|
||||||
|
|
||||||
|
public MockSiteToSiteMetricsReportingTask() throws IOException {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<byte[]> dataSent = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SiteToSiteClient getClient() {
|
||||||
|
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
|
||||||
|
final Transaction transaction = Mockito.mock(Transaction.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Mockito.doAnswer(new Answer<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object answer(final InvocationOnMock invocation) throws Throwable {
|
||||||
|
final byte[] data = invocation.getArgumentAt(0, byte[].class);
|
||||||
|
dataSent.add(data);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
|
||||||
|
|
||||||
|
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Assert.fail(e.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue