NIFI-3034 This closes #1323. Update Counter and test case

This commit is contained in:
Peddy 2017-03-11 11:50:52 -05:00 committed by joewitt
parent 977aa6919c
commit ccd11816e4
3 changed files with 169 additions and 0 deletions

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"counter","debug", "instrumentation"})
@CapabilityDescription("This processor allows users to set specific counters and key points in their flow. It is useful for debugging and basic counting functions.")
@ReadsAttribute(attribute = "counterName", description = "The name of the counter to update/get.")
public class UpdateCounter extends AbstractProcessor {
static final PropertyDescriptor COUNTER_NAME = new PropertyDescriptor.Builder()
.name("counter-name")
.displayName("Counter Name")
.description("The name of the counter you want to set the value off - supports expression language like ${counterName}")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor DELTA = new PropertyDescriptor.Builder()
.name("delta")
.displayName("Delta")
.description("Adjusts the counter by the specified delta for each flow file received. May be a positive or negative integer.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Counter was updated/retrieved")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(COUNTER_NAME);
descriptors.add(DELTA);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
session.adjustCounter(context.getProperty(COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(),
Long.parseLong(context.getProperty(DELTA).evaluateAttributeExpressions(flowFile).getValue()),
false
);
session.transfer(flowFile, SUCCESS);
}
}

View File

@ -96,3 +96,4 @@ org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.ListFTP org.apache.nifi.processors.standard.ListFTP
org.apache.nifi.processors.standard.FetchFTP org.apache.nifi.processors.standard.FetchFTP
org.apache.nifi.processors.standard.UpdateCounter

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestUpdateCounter {
@Test
public void testwithFileName() throws Exception {
final TestRunner firstrunner = TestRunners.newTestRunner(new UpdateCounter());
firstrunner.setProperty(UpdateCounter.COUNTER_NAME,"firewall");
firstrunner.setProperty(UpdateCounter.DELTA,"1");
Map<String,String> attributes = new HashMap<String,String>();
firstrunner.enqueue("",attributes);
firstrunner.run();
firstrunner.assertAllFlowFilesTransferred(UpdateCounter.SUCCESS, 1);
}
@Test
public void testExpressionLanguage() throws Exception {
final TestRunner firstrunner = TestRunners.newTestRunner(new UpdateCounter());
firstrunner.setProperty(UpdateCounter.COUNTER_NAME,"${filename}");
firstrunner.setProperty(UpdateCounter.DELTA,"${num}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "test");
attributes.put("num", "40");
firstrunner.enqueue(new byte[0],attributes);
firstrunner.run();
Long counter = firstrunner.getCounterValue("test");
assertEquals(java.util.Optional.ofNullable(counter), java.util.Optional.ofNullable(40L));
firstrunner.assertAllFlowFilesTransferred(UpdateCounter.SUCCESS, 1);
}
}