NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order

This closes #5706.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Payne 2022-01-24 11:42:03 -05:00 committed by Peter Turcsanyi
parent ec26ec9904
commit b5dd35431e
5 changed files with 190 additions and 3 deletions

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
@ -149,7 +150,8 @@ public class StatelessProcessScheduler implements ProcessScheduler {
logger.info("Stopping {}", procNode); logger.info("Stopping {}", procNode);
final ProcessContext processContext = processContextFactory.createProcessContext(procNode); final ProcessContext processContext = processContextFactory.createProcessContext(procNode);
final LifecycleState lifecycleState = new LifecycleState(); final LifecycleState lifecycleState = new LifecycleState();
lifecycleState.setScheduled(false); final boolean scheduled = procNode.getScheduledState() == ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
lifecycleState.setScheduled(scheduled);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState); return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState);
} }

View File

@ -62,7 +62,7 @@ public class StatelessSystemIT {
public TestName name = new TestName(); public TestName name = new TestName();
@Rule @Rule
public Timeout defaultTimeout = new Timeout(30, TimeUnit.MINUTES); public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
@Before @Before

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.basics;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.stateless.StatelessSystemIT;
import org.apache.nifi.stateless.VersionedFlowBuilder;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProcessorLifecycleIT extends StatelessSystemIT {
private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
@Test
public void testRunProcessorShutdown() throws StatelessConfigurationException, IOException, InterruptedException {
final File eventsFile = new File("target/events.txt");
Files.deleteIfExists(eventsFile.toPath());
final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
final VersionedProcessor writeLifecycleEvents = flowBuilder.createSimpleProcessor("WriteLifecycleEvents");
flowBuilder.createConnection(generate, writeLifecycleEvents, "success");
writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success"));
writeLifecycleEvents.setProperties(Collections.singletonMap("Event File", eventsFile.getAbsolutePath()));
final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
final DataflowTrigger trigger = dataflow.trigger();
final TriggerResult result = trigger.getResult();
result.acknowledge();
dataflow.shutdown();
List<String> events = Files.readAllLines(eventsFile.toPath());
// Because the processors may be stopped in the background, we want to wait until we receive the events that we expect.
while (events.size() < 4) {
logger.info("Expecting to find 4 events written to {} but currently found only {}; will wait 100 milliseconds and check again", eventsFile.getAbsolutePath(), events.size());
Thread.sleep(100L);
events = Files.readAllLines(eventsFile.toPath());
}
assertEquals(Arrays.asList("OnScheduled", "OnTrigger", "OnUnscheduled", "OnStopped"), events);
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class WriteLifecycleEvents extends AbstractProcessor {
static final PropertyDescriptor EVENT_FILE = new PropertyDescriptor.Builder()
.name("Event File")
.displayName("Event File")
.description("Specifies the file to write to that contains a line of text for each lifecycle event that occurs")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("target/CountLifecycleEvents.events")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles go here")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(EVENT_FILE);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
writeEvent(context, "OnScheduled");
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) throws IOException {
writeEvent(context, "OnUnscheduled");
}
@OnStopped
public void onStopped(final ProcessContext context) throws IOException {
writeEvent(context, "OnStopped");
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
writeEvent(context, "OnTrigger");
} catch (IOException e) {
throw new ProcessException(e);
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
session.transfer(flowFile, REL_SUCCESS);
}
private void writeEvent(final ProcessContext context, final String event) throws IOException {
final File file = new File(context.getProperty(EVENT_FILE).getValue());
final byte[] eventBytes = (event + "\n").getBytes(StandardCharsets.UTF_8);
try (final OutputStream fos = new FileOutputStream(file, true)) {
fos.write(eventBytes);
}
}
}

View File

@ -43,4 +43,5 @@ org.apache.nifi.processors.tests.system.UpdateContent
org.apache.nifi.processors.tests.system.ValidateFileExists org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
org.apache.nifi.processors.tests.system.WriteLifecycleEvents
org.apache.nifi.processors.tests.system.WriteToFile org.apache.nifi.processors.tests.system.WriteToFile