NIFI-12389: Add variance and standard deviation to AttributeRollingWindow

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8048.
This commit is contained in:
Matt Burgess 2023-11-17 16:58:52 -05:00 committed by Pierre Villard
parent 63364687d8
commit e81d7254ec
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 85 additions and 12 deletions

View File

@ -38,6 +38,11 @@
</dependency>
<!-- External dependencies -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Test dependencies -->
<dependency>

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.stateful.analysis;
import org.apache.commons.math3.stat.descriptive.moment.Variance;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@ -49,7 +50,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_COUNT_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_MEAN_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_STDDEV_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VALUE_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VARIANCE_KEY;
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -59,7 +62,9 @@ import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindo
@WritesAttributes({
@WritesAttribute(attribute = ROLLING_WINDOW_VALUE_KEY, description = "The rolling window value (sum of all the values stored)."),
@WritesAttribute(attribute = ROLLING_WINDOW_COUNT_KEY, description = "The count of the number of FlowFiles seen in the rolling window."),
@WritesAttribute(attribute = ROLLING_WINDOW_MEAN_KEY, description = "The mean of the FlowFiles seen in the rolling window.")
@WritesAttribute(attribute = ROLLING_WINDOW_MEAN_KEY, description = "The mean of the FlowFiles seen in the rolling window."),
@WritesAttribute(attribute = ROLLING_WINDOW_VARIANCE_KEY, description = "The variance of the FlowFiles seen in the rolling window."),
@WritesAttribute(attribute = ROLLING_WINDOW_STDDEV_KEY, description = "The standard deviation (positive square root of the variance) of the FlowFiles seen in the rolling window.")
})
@Stateful(scopes = {Scope.LOCAL}, description = "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their " +
"counts.")
@ -69,6 +74,8 @@ public class AttributeRollingWindow extends AbstractProcessor {
public static final String ROLLING_WINDOW_VALUE_KEY = "rolling_window_value";
public static final String ROLLING_WINDOW_COUNT_KEY = "rolling_window_count";
public static final String ROLLING_WINDOW_MEAN_KEY = "rolling_window_mean";
public static final String ROLLING_WINDOW_VARIANCE_KEY = "rolling_window_variance";
public static final String ROLLING_WINDOW_STDDEV_KEY = "rolling_window_stddev";
public static final String CURRENT_MICRO_BATCH_STATE_TS_KEY = "start_curr_batch_ts";
public static final String BATCH_APPEND_KEY = "_batch";
@ -218,16 +225,18 @@ public class AttributeRollingWindow extends AbstractProcessor {
}
Double aggregateValue = 0.0D;
Variance variance = new Variance();
for(Map.Entry<String,String> entry: state.entrySet()){
if(!entry.getKey().equals(COUNT_KEY)){
aggregateValue += Double.valueOf(entry.getValue());
final Double value = Double.valueOf(entry.getValue());
variance.increment(value);
aggregateValue += value ;
}
}
final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble();
variance.increment(currentFlowFileValue);
aggregateValue += currentFlowFileValue;
state.put(String.valueOf(currTime), String.valueOf(currentFlowFileValue));
@ -250,6 +259,9 @@ public class AttributeRollingWindow extends AbstractProcessor {
attributesToAdd.put(ROLLING_WINDOW_VALUE_KEY, String.valueOf(aggregateValue));
attributesToAdd.put(ROLLING_WINDOW_COUNT_KEY, String.valueOf(count));
attributesToAdd.put(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean));
double varianceValue = variance.getResult();
attributesToAdd.put(ROLLING_WINDOW_VARIANCE_KEY, String.valueOf(varianceValue));
attributesToAdd.put(ROLLING_WINDOW_STDDEV_KEY, String.valueOf(Math.sqrt(varianceValue)));
flowFile = session.putAllAttributes(flowFile, attributesToAdd);
@ -315,6 +327,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
Double aggregateValue = 0.0D;
Double currentBatchValue = 0.0D;
Long currentBatchCount = 0L;
Variance variance = new Variance();
for(Map.Entry<String,String> entry: state.entrySet()){
String key = entry.getKey();
@ -322,7 +335,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
String timeStampString = key.substring(0, key.length() - COUNT_APPEND_KEY_LENGTH);
Double batchValue = Double.valueOf(entry.getValue());
Long batchCount = Long.valueOf(state.get(timeStampString+COUNT_APPEND_KEY));
Long batchCount = Long.valueOf(state.get(timeStampString + COUNT_APPEND_KEY));
if (!newBatch && timeStampString.equals(currBatchStart)) {
final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble();
@ -334,17 +347,18 @@ public class AttributeRollingWindow extends AbstractProcessor {
}
aggregateValue += batchValue;
variance.increment(batchValue);
}
}
if(newBatch) {
if (newBatch) {
final Double currentFlowFileValue = context.getProperty(VALUE_TO_TRACK).evaluateAttributeExpressions(flowFile).asDouble();
currentBatchValue += currentFlowFileValue;
currentBatchCount = 1L;
aggregateValue += currentBatchValue;
variance.increment(currentBatchValue);
}
state.put(currBatchStart + BATCH_APPEND_KEY, String.valueOf(currentBatchValue));
@ -365,6 +379,9 @@ public class AttributeRollingWindow extends AbstractProcessor {
attributesToAdd.put(ROLLING_WINDOW_VALUE_KEY, String.valueOf(aggregateValue));
attributesToAdd.put(ROLLING_WINDOW_COUNT_KEY, String.valueOf(count));
attributesToAdd.put(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean));
double varianceValue = variance.getResult();
attributesToAdd.put(ROLLING_WINDOW_VARIANCE_KEY, String.valueOf(varianceValue));
attributesToAdd.put(ROLLING_WINDOW_STDDEV_KEY, String.valueOf(Math.sqrt(varianceValue)));
flowFile = session.putAllAttributes(flowFile, attributesToAdd);

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.stateful.analysis;
import org.apache.commons.math3.stat.descriptive.moment.Variance;
import org.apache.commons.math3.util.Precision;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.state.MockStateManager;
@ -27,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.condition.OS;
import org.opentest4j.AssertionFailedError;
import java.io.IOException;
import java.util.HashMap;
@ -35,10 +38,14 @@ import java.util.Map;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.REL_FAILED_SET_STATE;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_COUNT_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_MEAN_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_STDDEV_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VALUE_KEY;
import static org.apache.nifi.processors.stateful.analysis.AttributeRollingWindow.ROLLING_WINDOW_VARIANCE_KEY;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AttributeRollingWindowIT {
private static final double EPSILON = 0.000001d; // "Error" threshold for floating-point arithmetic
@Test
public void testFailureDueToBadAttribute() {
final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class);
@ -91,6 +98,8 @@ public class AttributeRollingWindowIT {
mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_VALUE_KEY);
mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_COUNT_KEY);
mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_MEAN_KEY);
mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_VARIANCE_KEY);
mockFlowFile.assertAttributeNotExists(ROLLING_WINDOW_STDDEV_KEY);
}
private boolean isWindowsEnvironment() {
@ -117,6 +126,8 @@ public class AttributeRollingWindowIT {
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "1.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "1");
flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0");
runner.enqueue("2".getBytes(), attributes);
runner.run(1);
@ -126,6 +137,8 @@ public class AttributeRollingWindowIT {
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "2.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "2");
flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0");
Thread.sleep(500L);
@ -137,10 +150,11 @@ public class AttributeRollingWindowIT {
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, "1.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, "1");
flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, "1.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0");
}
@Test
@DisabledOnOs(OS.WINDOWS)
public void testVerifyCount() throws InterruptedException {
@ -153,9 +167,8 @@ public class AttributeRollingWindowIT {
final Map<String, String> attributes = new HashMap<>();
attributes.put("value", "1");
for(int i = 1; i<61; i++){
for (int i = 1; i < 61; i++) {
runner.enqueue(String.valueOf(i).getBytes(), attributes);
runner.run();
flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
@ -166,11 +179,11 @@ public class AttributeRollingWindowIT {
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(value));
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i));
flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean));
flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0");
Thread.sleep(10L);
}
runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
runner.setProperty(AttributeRollingWindow.SUB_WINDOW_LENGTH, "500 ms");
runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec");
@ -188,10 +201,48 @@ public class AttributeRollingWindowIT {
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(Double.valueOf(i)));
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i));
flowFile.assertAttributeEquals(ROLLING_WINDOW_MEAN_KEY, String.valueOf(mean));
flowFile.assertAttributeEquals(ROLLING_WINDOW_VARIANCE_KEY, "0.0");
flowFile.assertAttributeEquals(ROLLING_WINDOW_STDDEV_KEY, "0.0");
Thread.sleep(10L);
}
}
@Test
@DisabledOnOs(OS.WINDOWS)
public void testVerifyVarianceAndStandardDeviation() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class);
runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec");
MockFlowFile flowFile;
final Map<String, String> attributes = new HashMap<>();
final Variance variance = new Variance();
double sum = 0.0D;
for (int i = 1; i < 61; i++) {
attributes.put("value", String.valueOf(i));
runner.enqueue(new byte[]{}, attributes);
runner.run();
flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
runner.clearTransferState();
sum += i;
double mean = sum / i;
variance.increment(i);
flowFile.assertAttributeEquals(ROLLING_WINDOW_VALUE_KEY, String.valueOf(sum));
flowFile.assertAttributeEquals(ROLLING_WINDOW_COUNT_KEY, String.valueOf(i));
assertTrue(Precision.equals(mean, Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_MEAN_KEY)), EPSILON));
try {
assertTrue(Precision.equals(variance.getResult(), Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_VARIANCE_KEY)), EPSILON));
} catch (AssertionFailedError ae) {
System.err.println("Error at " + i + ": " + variance.getResult() + " != " + Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_VARIANCE_KEY)));
}
assertTrue(Precision.equals(Math.sqrt(variance.getResult()), Double.parseDouble(flowFile.getAttribute(ROLLING_WINDOW_STDDEV_KEY)), EPSILON));
Thread.sleep(10L);
}
}
@EnabledIfSystemProperty(named = "nifi.test.unstable",