mirror of https://github.com/apache/nifi.git
NIFI-719: merged with develop
This commit is contained in:
commit
a09180799d
|
@ -17,13 +17,14 @@
|
||||||
package org.apache.nifi.controller;
|
package org.apache.nifi.controller;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This context is passed to ControllerServices after the service has been
|
* This context is passed to ControllerServices and Reporting Tasks in order
|
||||||
* initialized.
|
* to expose their configuration to them.
|
||||||
*/
|
*/
|
||||||
public interface ConfigurationContext {
|
public interface ConfigurationContext {
|
||||||
|
|
||||||
|
@ -39,4 +40,22 @@ public interface ConfigurationContext {
|
||||||
*/
|
*/
|
||||||
Map<PropertyDescriptor, String> getProperties();
|
Map<PropertyDescriptor, String> getProperties();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a String representation of the scheduling period, or <code>null</code> if
|
||||||
|
* the component does not have a scheduling period (e.g., for ControllerServices)
|
||||||
|
*/
|
||||||
|
String getSchedulingPeriod();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the amount of time, in the given {@link TimeUnit} that will
|
||||||
|
* elapsed between the return of one execution of the
|
||||||
|
* component's <code>onTrigger</code> method and
|
||||||
|
* the time at which the method is invoked again. This method will return
|
||||||
|
* null if the component does not have a scheduling period (e.g., for ControllerServices)
|
||||||
|
*
|
||||||
|
* @param timeUnit unit of time for scheduling
|
||||||
|
* @return period of time or <code>null</code> if component does not have a scheduling
|
||||||
|
* period
|
||||||
|
*/
|
||||||
|
Long getSchedulingPeriod(TimeUnit timeUnit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.util;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -63,4 +64,14 @@ public class MockConfigurationContext implements ConfigurationContext {
|
||||||
final PropertyDescriptor resolved = service.getPropertyDescriptor(property.getName());
|
final PropertyDescriptor resolved = service.getPropertyDescriptor(property.getName());
|
||||||
return resolved == null ? property : resolved;
|
return resolved == null ? property : resolved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSchedulingPeriod() {
|
||||||
|
return "0 secs";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getSchedulingPeriod(final TimeUnit timeUnit) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.documentation.mock;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -35,4 +36,13 @@ public class MockConfigurationContext implements ConfigurationContext {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSchedulingPeriod() {
|
||||||
|
return "0 secs";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getSchedulingPeriod(final TimeUnit timeUnit) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -593,7 +593,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (startDelayedComponents) {
|
if (startDelayedComponents) {
|
||||||
LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
|
LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
|
||||||
for (final Connectable connectable : startConnectablesAfterInitialization) {
|
for (final Connectable connectable : startConnectablesAfterInitialization) {
|
||||||
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
|
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1012,7 +1012,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
public boolean isTerminated() {
|
public boolean isTerminated() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
try {
|
try {
|
||||||
return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
|
return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -1054,7 +1054,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
// invoke any methods annotated with @OnShutdown on Controller Services
|
// invoke any methods annotated with @OnShutdown on Controller Services
|
||||||
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||||
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider);
|
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null);
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1828,9 +1828,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
} else if (id1.equals(id2)) {
|
} else if (id1.equals(id2)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
|
final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
|
||||||
final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
|
final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
|
||||||
return (comparable1.equals(comparable2));
|
return comparable1.equals(comparable2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1964,7 +1964,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
}
|
}
|
||||||
|
|
||||||
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
|
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
|
||||||
return (root == null) ? null : root.findProcessGroup(searchId);
|
return root == null ? null : root.findProcessGroup(searchId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2079,8 +2079,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
|
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
|
||||||
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
|
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
|
||||||
|
|
||||||
flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
|
flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
|
||||||
bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
|
bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (StringUtils.isNotBlank(conn.getName())) {
|
if (StringUtils.isNotBlank(conn.getName())) {
|
||||||
|
|
|
@ -104,7 +104,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConfigurationContext getConfigurationContext() {
|
public ConfigurationContext getConfigurationContext() {
|
||||||
return new StandardConfigurationContext(this, serviceLookup);
|
return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -146,7 +146,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
// We need to invoke any method annotation with the OnConfigured annotation in order to
|
// We need to invoke any method annotation with the OnConfigured annotation in order to
|
||||||
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
|
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
|
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod());
|
||||||
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
|
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
|
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
|
||||||
|
|
|
@ -185,7 +185,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
|
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||||
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
|
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
|
||||||
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
||||||
|
|
||||||
|
@ -230,7 +230,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
|
ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
|
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||||
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
|
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
|
||||||
componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
|
componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
|
||||||
|
|
||||||
|
@ -284,7 +284,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public void run() {
|
public void run() {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
long lastStopTime = scheduleState.getLastStopTime();
|
final long lastStopTime = scheduleState.getLastStopTime();
|
||||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
|
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
|
||||||
|
|
||||||
final Set<String> serviceIds = new HashSet<>();
|
final Set<String> serviceIds = new HashSet<>();
|
||||||
|
@ -330,7 +330,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
|
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||||
final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
|
final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
|
||||||
|
|
||||||
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
|
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
|
||||||
|
@ -569,7 +569,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
@Override
|
@Override
|
||||||
public boolean isScheduled(final Object scheduled) {
|
public boolean isScheduled(final Object scheduled) {
|
||||||
final ScheduleState scheduleState = scheduleStates.get(scheduled);
|
final ScheduleState scheduleState = scheduleStates.get(scheduled);
|
||||||
return (scheduleState == null) ? false : scheduleState.isScheduled();
|
return scheduleState == null ? false : scheduleState.isScheduled();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -582,7 +582,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
ScheduleState scheduleState = scheduleStates.get(schedulable);
|
ScheduleState scheduleState = scheduleStates.get(schedulable);
|
||||||
if (scheduleState == null) {
|
if (scheduleState == null) {
|
||||||
scheduleState = new ScheduleState();
|
scheduleState = new ScheduleState();
|
||||||
ScheduleState previous = scheduleStates.putIfAbsent(schedulable, scheduleState);
|
final ScheduleState previous = scheduleStates.putIfAbsent(schedulable, scheduleState);
|
||||||
if (previous != null) {
|
if (previous != null) {
|
||||||
scheduleState = previous;
|
scheduleState = previous;
|
||||||
}
|
}
|
||||||
|
@ -599,8 +599,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
long lastStopTime = scheduleState.getLastStopTime();
|
final long lastStopTime = scheduleState.getLastStopTime();
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
|
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
@ -622,7 +622,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
|
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||||
|
|
||||||
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
||||||
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
||||||
|
@ -637,7 +637,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
|
final Throwable cause = t instanceof InvocationTargetException ? t.getCause() : t;
|
||||||
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
||||||
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
||||||
|
|
||||||
|
@ -666,7 +666,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
|
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
@ -675,7 +675,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
service.setState(ControllerServiceState.DISABLED);
|
service.setState(ControllerServiceState.DISABLED);
|
||||||
return;
|
return;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
|
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||||
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
|
||||||
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
|
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.controller.service;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.nifi.attribute.expression.language.PreparedQuery;
|
import org.apache.nifi.attribute.expression.language.PreparedQuery;
|
||||||
import org.apache.nifi.attribute.expression.language.Query;
|
import org.apache.nifi.attribute.expression.language.Query;
|
||||||
|
@ -27,16 +28,29 @@ import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ConfiguredComponent;
|
import org.apache.nifi.controller.ConfiguredComponent;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
import org.apache.nifi.processor.StandardPropertyValue;
|
import org.apache.nifi.processor.StandardPropertyValue;
|
||||||
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
|
||||||
public class StandardConfigurationContext implements ConfigurationContext {
|
public class StandardConfigurationContext implements ConfigurationContext {
|
||||||
|
|
||||||
private final ConfiguredComponent component;
|
private final ConfiguredComponent component;
|
||||||
private final ControllerServiceLookup serviceLookup;
|
private final ControllerServiceLookup serviceLookup;
|
||||||
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
|
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
|
||||||
|
private final String schedulingPeriod;
|
||||||
|
private final Long schedulingNanos;
|
||||||
|
|
||||||
public StandardConfigurationContext(final ConfiguredComponent component, final ControllerServiceLookup serviceLookup) {
|
public StandardConfigurationContext(final ConfiguredComponent component, final ControllerServiceLookup serviceLookup, final String schedulingPeriod) {
|
||||||
this.component = component;
|
this.component = component;
|
||||||
this.serviceLookup = serviceLookup;
|
this.serviceLookup = serviceLookup;
|
||||||
|
this.schedulingPeriod = schedulingPeriod;
|
||||||
|
if (schedulingPeriod == null) {
|
||||||
|
schedulingNanos = null;
|
||||||
|
} else {
|
||||||
|
if (FormatUtils.TIME_DURATION_PATTERN.matcher(schedulingPeriod).matches()) {
|
||||||
|
schedulingNanos = FormatUtils.getTimeDuration(schedulingPeriod, TimeUnit.NANOSECONDS);
|
||||||
|
} else {
|
||||||
|
schedulingNanos = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
preparedQueries = new HashMap<>();
|
preparedQueries = new HashMap<>();
|
||||||
for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) {
|
||||||
|
@ -61,4 +75,14 @@ public class StandardConfigurationContext implements ConfigurationContext {
|
||||||
public Map<PropertyDescriptor, String> getProperties() {
|
public Map<PropertyDescriptor, String> getProperties() {
|
||||||
return component.getProperties();
|
return component.getProperties();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSchedulingPeriod() {
|
||||||
|
return schedulingPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getSchedulingPeriod(final TimeUnit timeUnit) {
|
||||||
|
return schedulingNanos == null ? null : timeUnit.convert(schedulingNanos, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void onConfigured() {
|
private void onConfigured() {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
|
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider, null);
|
||||||
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
|
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
|
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
final ControllerServiceNode node = serviceNodeHolder.get();
|
final ControllerServiceNode node = serviceNodeHolder.get();
|
||||||
final ControllerServiceState state = node.getState();
|
final ControllerServiceState state = node.getState();
|
||||||
final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
|
final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED.
|
||||||
if (disabled && !validDisabledMethods.contains(method)) {
|
if (disabled && !validDisabledMethods.contains(method)) {
|
||||||
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
|
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
|
||||||
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
||||||
|
@ -435,7 +435,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
@Override
|
@Override
|
||||||
public ControllerService getControllerService(final String serviceIdentifier) {
|
public ControllerService getControllerService(final String serviceIdentifier) {
|
||||||
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
||||||
return (node == null) ? null : node.getProxiedControllerService();
|
return node == null ? null : node.getProxiedControllerService();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -446,13 +446,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
@Override
|
@Override
|
||||||
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
|
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
|
||||||
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
||||||
return (node == null) ? false : (ControllerServiceState.ENABLED == node.getState());
|
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
|
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
|
||||||
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
|
||||||
return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState());
|
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -488,7 +488,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
serviceNode.verifyCanDelete();
|
serviceNode.verifyCanDelete();
|
||||||
|
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
|
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this, null);
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.nifi.reporting.AbstractReportingTask;
|
||||||
import org.apache.nifi.reporting.Bulletin;
|
import org.apache.nifi.reporting.Bulletin;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.reporting.ReportingContext;
|
import org.apache.nifi.reporting.ReportingContext;
|
||||||
import org.apache.nifi.reporting.ReportingInitializationContext;
|
|
||||||
import org.apache.nifi.reporting.Severity;
|
import org.apache.nifi.reporting.Severity;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -131,13 +130,6 @@ public class MonitorMemory extends AbstractReportingTask {
|
||||||
public MonitorMemory() {
|
public MonitorMemory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long schedulingPeriodMillis;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void init(final ReportingInitializationContext config) throws InitializationException {
|
|
||||||
schedulingPeriodMillis = config.getSchedulingPeriod(TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(3);
|
final List<PropertyDescriptor> descriptors = new ArrayList<>(3);
|
||||||
|
@ -156,7 +148,7 @@ public class MonitorMemory extends AbstractReportingTask {
|
||||||
// validate reporting interval
|
// validate reporting interval
|
||||||
final Long reportingIntervalValue = config.getProperty(REPORTING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
|
final Long reportingIntervalValue = config.getProperty(REPORTING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
if (reportingIntervalValue == null) {
|
if (reportingIntervalValue == null) {
|
||||||
reportingIntervalMillis = schedulingPeriodMillis;
|
reportingIntervalMillis = config.getSchedulingPeriod(TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
reportingIntervalMillis = reportingIntervalValue;
|
reportingIntervalMillis = reportingIntervalValue;
|
||||||
}
|
}
|
||||||
|
@ -174,7 +166,7 @@ public class MonitorMemory extends AbstractReportingTask {
|
||||||
} else {
|
} else {
|
||||||
final String percentage = thresholdValue.substring(0, thresholdValue.length() - 1);
|
final String percentage = thresholdValue.substring(0, thresholdValue.length() - 1);
|
||||||
final double pct = Double.parseDouble(percentage) / 100D;
|
final double pct = Double.parseDouble(percentage) / 100D;
|
||||||
final long calculatedThreshold = (long) ((double) bean.getUsage().getMax() * pct);
|
final long calculatedThreshold = (long) (bean.getUsage().getMax() * pct);
|
||||||
if (bean.isCollectionUsageThresholdSupported()) {
|
if (bean.isCollectionUsageThresholdSupported()) {
|
||||||
bean.setCollectionUsageThreshold(calculatedThreshold);
|
bean.setCollectionUsageThreshold(calculatedThreshold);
|
||||||
}
|
}
|
||||||
|
@ -201,7 +193,7 @@ public class MonitorMemory extends AbstractReportingTask {
|
||||||
} else {
|
} else {
|
||||||
final String percentage = threshold.substring(0, threshold.length() - 1);
|
final String percentage = threshold.substring(0, threshold.length() - 1);
|
||||||
final double pct = Double.parseDouble(percentage) / 100D;
|
final double pct = Double.parseDouble(percentage) / 100D;
|
||||||
return (long) ((double) maxBytes * pct);
|
return (long) (maxBytes * pct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue