diff --git a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/pom.xml b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/pom.xml index 72f03ed227..9ac252d749 100644 --- a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/pom.xml +++ b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/pom.xml @@ -38,6 +38,11 @@ + + org.apache.commons + commons-math3 + 3.6.1 + diff --git a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/main/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindow.java b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/main/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindow.java index ba1bd11182..c4b794ff99 100644 --- a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/main/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindow.java +++ b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/main/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindow.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.stateful.analysis; +import org.apache.commons.math3.stat.descriptive.moment.Variance; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -49,7 +50,9 @@ import java.util.concurrent.TimeUnit; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_COUNT_KEY; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_MEAN_KEY; +import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_STDDEV_KEY; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VALUE_KEY; +import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VARIANCE_KEY; @TriggerSerially @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -59,7 +62,9 @@ import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindo @WritesAttributes({ @WritesAttribute(attribute = ROLLING_WINDOW_VALUE_KEY, description = "The rolling window value (sum of all the values stored)."), @WritesAttribute(attribute = ROLLING_WINDOW_COUNT_KEY, description = "The count of the number of FlowFiles seen in the rolling window."), - @WritesAttribute(attribute = ROLLING_WINDOW_MEAN_KEY, description = "The mean of the FlowFiles seen in the rolling window.") + @WritesAttribute(attribute = ROLLING_WINDOW_MEAN_KEY, description = "The mean of the FlowFiles seen in the rolling window."), + @WritesAttribute(attribute = ROLLING_WINDOW_VARIANCE_KEY, description = "The variance of the FlowFiles seen in the rolling window."), + @WritesAttribute(attribute = ROLLING_WINDOW_STDDEV_KEY, description = "The standard deviation (positive square root of the variance) of the FlowFiles seen in the rolling window.") }) @Stateful(scopes = {Scope.LOCAL}, description = "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their " + "counts.") @@ -69,6 +74,8 @@ public class AttributeRollingWindow extends AbstractProcessor { public static final String ROLLING_WINDOW_VALUE_KEY = "rolling_window_value"; public static final String ROLLING_WINDOW_COUNT_KEY = "rolling_window_count"; public static final String ROLLING_WINDOW_MEAN_KEY = "rolling_window_mean"; + public static final String ROLLING_WINDOW_VARIANCE_KEY = "rolling_window_variance"; + public static final String ROLLING_WINDOW_STDDEV_KEY = "rolling_window_stddev"; public static final String CURRENT_MICRO_BATCH_STATE_TS_KEY = "start_curr_batch_ts"; public static final String BATCH_APPEND_KEY = "_batch"; @@ -218,16 +225,18 @@ public class AttributeRollingWindow extends AbstractProcessor { } Double aggregateValue = 0.0D; + Variance variance = new Variance(); for(Map.Entry entry: state.entrySet()){ if(!entry.getKey().equals(COUNT_KEY)){ - aggregateValue += Double.valueOf(entry.getValue()); + final Double value = Double.valueOf(entry.getValue()); + variance.increment(value); + aggregateValue += value ; } } - final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble(); - + variance.increment(currentFlowFileValue); aggregateValue += currentFlowFileValue; state.put(String.valueOf(currTime), String.valueOf(currentFlowFileValue)); @@ -250,6 +259,9 @@ public class AttributeRollingWindow extends AbstractProcessor { attributesToAdd.put(ROLLING_WINDOW_VALUE_KEY, String.valueOf(aggregateValue)); attributesToAdd.put(ROLLING_WINDOW_COUNT_KEY, String.valueOf(count)); attributesToAdd.put(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean)); + double varianceValue = variance.getResult(); + attributesToAdd.put(ROLLING_WINDOW_VARIANCE_KEY, String.valueOf(varianceValue)); + attributesToAdd.put(ROLLING_WINDOW_STDDEV_KEY, String.valueOf(Math.sqrt(varianceValue))); flowFile = session.putAllAttributes(flowFile, attributesToAdd); @@ -315,6 +327,7 @@ public class AttributeRollingWindow extends AbstractProcessor { Double aggregateValue = 0.0D; Double currentBatchValue = 0.0D; Long currentBatchCount = 0L; + Variance variance = new Variance(); for(Map.Entry entry: state.entrySet()){ String key = entry.getKey(); @@ -322,7 +335,7 @@ public class AttributeRollingWindow extends AbstractProcessor { String timeStampString = key.substring(0, key.length() - COUNT_APPEND_KEY_LENGTH); Double batchValue = Double.valueOf(entry.getValue()); - Long batchCount = Long.valueOf(state.get(timeStampString+COUNT_APPEND_KEY)); + Long batchCount = Long.valueOf(state.get(timeStampString + COUNT_APPEND_KEY)); if (!newBatch && timeStampString.equals(currBatchStart)) { final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble(); @@ -334,17 +347,18 @@ public class AttributeRollingWindow extends AbstractProcessor { } aggregateValue += batchValue; - + variance.increment(batchValue); } } - if(newBatch) { + if (newBatch) { final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble(); currentBatchValue += currentFlowFileValue; currentBatchCount = 1L; aggregateValue += currentBatchValue; + variance.increment(currentBatchValue); } state.put(currBatchStart + BATCH_APPEND_KEY, String.valueOf(currentBatchValue)); @@ -365,6 +379,9 @@ public class AttributeRollingWindow extends AbstractProcessor { attributesToAdd.put(ROLLING_WINDOW_VALUE_KEY, String.valueOf(aggregateValue)); attributesToAdd.put(ROLLING_WINDOW_COUNT_KEY, String.valueOf(count)); attributesToAdd.put(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean)); + double varianceValue = variance.getResult(); + attributesToAdd.put(ROLLING_WINDOW_VARIANCE_KEY, String.valueOf(varianceValue)); + attributesToAdd.put(ROLLING_WINDOW_STDDEV_KEY, String.valueOf(Math.sqrt(varianceValue))); flowFile = session.putAllAttributes(flowFile, attributesToAdd); diff --git a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/test/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindowIT.java b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/test/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindowIT.java index cc821c2a37..8bb5c7d44c 100644 --- a/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/test/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindowIT.java +++ b/nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/test/java/org/apache/nifi/processors/stateful/analysis/AttributeRollingWindowIT.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.stateful.analysis; +import org.apache.commons.math3.stat.descriptive.moment.Variance; +import org.apache.commons.math3.util.Precision; import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.state.MockStateManager; @@ -27,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.condition.OS; +import org.opentest4j.AssertionFailedError; import java.io.IOException; import java.util.HashMap; @@ -35,10 +38,14 @@ import java.util.Map; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.REL_FAILED_SET_STATE; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_COUNT_KEY; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_MEAN_KEY; +import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_STDDEV_KEY; import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VALUE_KEY; +import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VARIANCE_KEY; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AttributeRollingWindowIT { + private static final double EPSILON = 0.000001d; // "Error" threshold for floating-point arithmetic @Test public void testFailureDueToBadAttribute() { final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class); @@ -91,6 +98,8 @@ public class AttributeRollingWindowIT { mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_VALUE_KEY); mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_COUNT_KEY); mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_MEAN_KEY); + mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_VARIANCE_KEY); + mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_STDDEV_KEY); } private boolean isWindowsEnvironment() { @@ -117,6 +126,8 @@ public class AttributeRollingWindowIT { flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "1.0"); flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "1"); flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0"); runner.enqueue("2".getBytes(), attributes); runner.run(1); @@ -126,6 +137,8 @@ public class AttributeRollingWindowIT { flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "2.0"); flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "2"); flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0"); Thread.sleep(500L); @@ -137,10 +150,11 @@ public class AttributeRollingWindowIT { flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "1.0"); flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "1"); flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0"); } - @Test @DisabledOnOs(OS.WINDOWS) public void testVerifyCount() throws InterruptedException { @@ -153,9 +167,8 @@ public class AttributeRollingWindowIT { final Map attributes = new HashMap<>(); attributes.put("value", "1"); - for(int i = 1; i<61; i++){ + for (int i = 1; i < 61; i++) { runner.enqueue(String.valueOf(i).getBytes(), attributes); - runner.run(); flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0); @@ -166,11 +179,11 @@ public class AttributeRollingWindowIT { flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(value)); flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i)); flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean)); + flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0"); Thread.sleep(10L); } - - runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}"); runner.setProperty(AttributeRollingWindow.SUB_WINDOW_LENGTH, "500 ms"); runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec"); @@ -188,10 +201,48 @@ public class AttributeRollingWindowIT { flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(Double.valueOf(i))); flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i)); flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean)); + flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0"); + flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0"); Thread.sleep(10L); } + } + @Test + @DisabledOnOs(OS.WINDOWS) + public void testVerifyVarianceAndStandardDeviation() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class); + + runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}"); + runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec"); + + MockFlowFile flowFile; + + final Map attributes = new HashMap<>(); + final Variance variance = new Variance(); + double sum = 0.0D; + for (int i = 1; i < 61; i++) { + attributes.put("value", String.valueOf(i)); + runner.enqueue(new byte[]{}, attributes); + runner.run(); + + flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0); + runner.clearTransferState(); + sum += i; + double mean = sum / i; + variance.increment(i); + + flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(sum)); + flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i)); + assertTrue(Precision.equals(mean, Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_MEAN_KEY)), EPSILON)); + try { + assertTrue(Precision.equals(variance.getResult(), Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_VARIANCE_KEY)), EPSILON)); + } catch (AssertionFailedError ae) { + System.err.println("Error at " + i + ": " + variance.getResult() + " != " + Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_VARIANCE_KEY))); + } + assertTrue(Precision.equals(Math.sqrt(variance.getResult()), Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_STDDEV_KEY)), EPSILON)); + Thread.sleep(10L); + } } @EnabledIfSystemProperty(named = "nifi.test.unstable",