diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/CapturingLogger.java b/nifi-mock/src/main/java/org/apache/nifi/util/CapturingLogger.java index c627f3efd7..25b37a050e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/CapturingLogger.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/CapturingLogger.java @@ -6,6 +6,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.Marker; +import org.slf4j.helpers.MessageFormatter; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -215,25 +216,23 @@ public class CapturingLogger implements Logger { @Override public void info(String msg) { - infoMessages.add(new LogMessage(null, msg, null)); - logger.info(msg); + this.info(msg, (Object) null); } @Override public void info(String format, Object arg) { - infoMessages.add(new LogMessage(null, format, null, arg)); - logger.info(format, arg); + this.info(format, arg, null); } @Override public void info(String format, Object arg1, Object arg2) { - infoMessages.add(new LogMessage(null, format, null, arg1, arg2)); - logger.info(format, arg1, arg2); + this.info(format, new Object[] { arg1, arg2 }); } @Override public void info(String format, Object... arguments) { - infoMessages.add(new LogMessage(null, format, null, arguments)); + String message = MessageFormatter.arrayFormat(format, arguments).getMessage(); + infoMessages.add(new LogMessage(null, message, null, arguments)); logger.info(format, arguments); } @@ -287,25 +286,23 @@ public class CapturingLogger implements Logger { @Override public void warn(String msg) { - warnMessages.add(new LogMessage(null, msg, null)); - logger.warn(msg); + this.warn(msg, (Object) null); } @Override public void warn(String format, Object arg) { - warnMessages.add(new LogMessage(null, format, null, arg)); - logger.warn(format, arg); + this.warn(format, arg, null); } @Override public void warn(String format, Object arg1, Object arg2) { - warnMessages.add(new LogMessage(null, format, null, arg1, arg2)); - logger.warn(format, arg1, arg2); + this.warn(format, new Object[] { arg1, arg2 }); } @Override public void warn(String format, Object... arguments) { - warnMessages.add(new LogMessage(null, format, null, arguments)); + String message = MessageFormatter.arrayFormat(format, arguments).getMessage(); + warnMessages.add(new LogMessage(null, message, null, arguments)); logger.warn(format, arguments); } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/LogMessage.java b/nifi-mock/src/main/java/org/apache/nifi/util/LogMessage.java index 49f800b71f..2dc8630f38 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/LogMessage.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/LogMessage.java @@ -42,4 +42,9 @@ public class LogMessage { public Object[] getArgs() { return args; } + + @Override + public String toString() { + return this.msg; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index b80cc4b037..1729d4ac65 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -43,7 +43,6 @@ import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.StandardProcessorNode; -import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; @@ -203,7 +202,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); } agent.schedule(taskNode, scheduleState); @@ -254,7 +253,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { try { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext); } } catch (final Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; @@ -274,7 +273,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { agent.unschedule(taskNode, scheduleState); if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 1b84fe3de1..f11cde96fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -230,7 +230,7 @@ public abstract class ApplicationResource { /** * Generates a 201 Created response with the specified content. * - * @param uri The URI + * @param uri The URI * @param entity entity * @return The response to be built */ @@ -241,7 +241,7 @@ public abstract class ApplicationResource { /** * Generates a 401 Not Authorized response with no content. - + * * @return The response to be built */ protected ResponseBuilder generateNotAuthorizedResponse() { @@ -364,7 +364,7 @@ public abstract class ApplicationResource { * * @param httpServletRequest the request * @return true if the request represents a two-phase commit style request and is the - * first of the two phases. + * first of the two phases. */ protected boolean isValidationPhase(HttpServletRequest httpServletRequest) { return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null; @@ -420,8 +420,8 @@ public abstract class ApplicationResource { * Authorizes the specified Snippet with the specified request action. * * @param authorizer authorizer - * @param lookup lookup - * @param action action + * @param lookup lookup + * @param action action */ protected void authorizeSnippet(final Snippet snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action) { final Consumer authorize = authorizable -> authorizable.authorize(authorizer, action); @@ -440,8 +440,8 @@ public abstract class ApplicationResource { * Authorizes the specified Snippet with the specified request action. * * @param authorizer authorizer - * @param lookup lookup - * @param action action + * @param lookup lookup + * @param action action */ protected void authorizeSnippet(final SnippetDTO snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action) { final Consumer authorize = authorizable -> authorizable.authorize(authorizer, action); @@ -460,57 +460,57 @@ public abstract class ApplicationResource { * Executes an action through the service facade using the specified revision. * * @param serviceFacade service facade - * @param revision revision - * @param authorizer authorizer - * @param verifier verifier - * @param action executor + * @param revision revision + * @param authorizer authorizer + * @param verifier verifier + * @param action executor * @return the response */ protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Revision revision, final AuthorizeAccess authorizer, - final Runnable verifier, final Supplier action) { + final Runnable verifier, final Supplier action) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); return withWriteLock(serviceFacade, authorizer, verifier, action, - () -> serviceFacade.claimRevision(revision, user), - () -> serviceFacade.cancelRevision(revision), - () -> serviceFacade.releaseRevisionClaim(revision, user)); + () -> serviceFacade.claimRevision(revision, user), + () -> serviceFacade.cancelRevision(revision), + () -> serviceFacade.releaseRevisionClaim(revision, user)); } /** * Executes an action through the service facade using the specified revision. * * @param serviceFacade service facade - * @param revisions revisions - * @param authorizer authorizer - * @param verifier verifier - * @param action executor + * @param revisions revisions + * @param authorizer authorizer + * @param verifier verifier + * @param action executor * @return the response */ protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Set revisions, final AuthorizeAccess authorizer, - final Runnable verifier, final Supplier action) { + final Runnable verifier, final Supplier action) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); return withWriteLock(serviceFacade, authorizer, verifier, action, - () -> serviceFacade.claimRevisions(revisions, user), - () -> serviceFacade.cancelRevisions(revisions), - () -> serviceFacade.releaseRevisionClaims(revisions, user)); + () -> serviceFacade.claimRevisions(revisions, user), + () -> serviceFacade.cancelRevisions(revisions), + () -> serviceFacade.releaseRevisionClaims(revisions, user)); } /** * Executes an action through the service facade using the specified revision. * - * @param serviceFacade service facade - * @param authorizer authorizer - * @param verifier verifier - * @param action the action to execute - * @param claimRevision a callback that will claim the necessary revisions for the operation + * @param serviceFacade service facade + * @param authorizer authorizer + * @param verifier verifier + * @param action the action to execute + * @param claimRevision a callback that will claim the necessary revisions for the operation * @param cancelRevision a callback that will cancel the necessary revisions if the operation fails - * @param releaseClaim a callback that will release any previously claimed revision if the operation is canceled after the first phase + * @param releaseClaim a callback that will release any previously claimed revision if the operation is canceled after the first phase * @return the response */ private Response withWriteLock( - final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier action, - final Runnable claimRevision, final Runnable cancelRevision, final Runnable releaseClaim) { + final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier action, + final Runnable claimRevision, final Runnable cancelRevision, final Runnable releaseClaim) { if (isClaimCancelationPhase(httpServletRequest)) { releaseClaim.run(); @@ -546,10 +546,9 @@ public abstract class ApplicationResource { /** * Replicates the request to the given node * - * @param method the HTTP method + * @param method the HTTP method * @param nodeUuid the UUID of the node to replicate the request to * @return the response from the node - * * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster */ protected Response replicate(final String method, final String nodeUuid) { @@ -559,11 +558,10 @@ public abstract class ApplicationResource { /** * Replicates the request to the given node * - * @param method the HTTP method - * @param entity the Entity to replicate + * @param method the HTTP method + * @param entity the Entity to replicate * @param nodeUuid the UUID of the node to replicate the request to * @return the response from the node - * * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster */ protected Response replicate(final String method, final Object entity, final String nodeUuid) { @@ -573,11 +571,10 @@ public abstract class ApplicationResource { /** * Replicates the request to the given node * - * @param method the HTTP method - * @param entity the Entity to replicate + * @param method the HTTP method + * @param entity the Entity to replicate * @param nodeUuid the UUID of the node to replicate the request to * @return the response from the node - * * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster */ protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map headersToOverride) { @@ -630,8 +627,8 @@ public abstract class ApplicationResource { * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be * that provided by the {@link #getAbsolutePath()} method * - * @param method the HTTP method to use - * @param entity the entity to replicate + * @param method the HTTP method to use + * @param entity the entity to replicate * @param headersToOverride the headers to override * @return the response from the request */ @@ -647,7 +644,7 @@ public abstract class ApplicationResource { /** * @return true if connected to a cluster, false - * if running in standalone mode or disconnected from cluster + * if running in standalone mode or disconnected from cluster */ boolean isConnectedToCluster() { return clusterCoordinator != null && clusterCoordinator.isConnected(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index b4f8c49385..103790eb39 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@ -511,7 +511,6 @@ public class EncryptContent extends AbstractProcessor { } catch (final ProcessException e) { logger.error("Cannot {}crypt {} - ", new Object[]{encrypt ? "en" : "de", flowFile, e}); session.transfer(flowFile, REL_FAILURE); - return; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml index 5d5d0c848b..d1ebd4fea6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml @@ -56,5 +56,16 @@ mockito-all test + + org.apache.nifi + nifi-framework-core + ${project.version} + test + + + org.apache.nifi + nifi-nar-utils + test + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorMemory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorMemory.java index 83795b2d37..a7ee39fdb4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorMemory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/MonitorMemory.java @@ -16,11 +16,11 @@ */ package org.apache.nifi.controller; -import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryUsage; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -91,15 +92,26 @@ import org.slf4j.LoggerFactory; + " that the memory pool is exceeding this threshold.") public class MonitorMemory extends AbstractReportingTask { + private static final AllowableValue[] memPoolAllowableValues; + + static { + List memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); + memPoolAllowableValues = new AllowableValue[memoryPoolBeans.size()]; + for (int i = 0; i < memPoolAllowableValues.length; i++) { + memPoolAllowableValues[i] = new AllowableValue(memoryPoolBeans.get(i).getName()); + } + } + public static final PropertyDescriptor MEMORY_POOL_PROPERTY = new PropertyDescriptor.Builder() .name("Memory Pool") + .displayName("Memory Pool") .description("The name of the JVM Memory Pool to monitor") .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue(null) + .allowableValues(memPoolAllowableValues) .build(); public static final PropertyDescriptor THRESHOLD_PROPERTY = new PropertyDescriptor.Builder() .name("Usage Threshold") + .displayName("Usage Threshold") .description("Indicates the threshold at which warnings should be generated") .required(true) .addValidator(new ThresholdValidator()) @@ -107,6 +119,7 @@ public class MonitorMemory extends AbstractReportingTask { .build(); public static final PropertyDescriptor REPORTING_INTERVAL = new PropertyDescriptor.Builder() .name("Reporting Interval") + .displayName("Reporting Interval") .description("Indicates how often this reporting task should report bulletins while the memory utilization exceeds the configured threshold") .required(false) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) @@ -121,22 +134,23 @@ public class MonitorMemory extends AbstractReportingTask { private volatile MemoryPoolMXBean monitoredBean; private volatile String threshold = "65%"; - private volatile long lastReportTime = 0L; + private volatile long lastReportTime; private volatile long reportingIntervalMillis; - private volatile boolean lastValueWasExceeded = false; + private volatile boolean lastValueWasExceeded; - private final List garbageCollectorBeans = new ArrayList<>(); + private final static List propertyDescriptors; - public MonitorMemory() { + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(MEMORY_POOL_PROPERTY); + _propertyDescriptors.add(THRESHOLD_PROPERTY); + _propertyDescriptors.add(REPORTING_INTERVAL); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); } @Override protected List getSupportedPropertyDescriptors() { - final List descriptors = new ArrayList<>(3); - descriptors.add(MEMORY_POOL_PROPERTY); - descriptors.add(THRESHOLD_PROPERTY); - descriptors.add(REPORTING_INTERVAL); - return descriptors; + return propertyDescriptors; } @OnScheduled @@ -145,7 +159,6 @@ public class MonitorMemory extends AbstractReportingTask { final String thresholdValue = config.getProperty(THRESHOLD_PROPERTY).getValue().trim(); threshold = thresholdValue; - // validate reporting interval final Long reportingIntervalValue = config.getProperty(REPORTING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); if (reportingIntervalValue == null) { reportingIntervalMillis = config.getSchedulingPeriod(TimeUnit.MILLISECONDS); @@ -154,30 +167,21 @@ public class MonitorMemory extends AbstractReportingTask { } final List memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); - for (final MemoryPoolMXBean bean : memoryPoolBeans) { - final String memoryPoolName = bean.getName(); + for (int i = 0; i < memoryPoolBeans.size() && monitoredBean == null; i++) { + MemoryPoolMXBean memoryPoolBean = memoryPoolBeans.get(i); + String memoryPoolName = memoryPoolBean.getName(); if (desiredMemoryPoolName.equals(memoryPoolName)) { - monitoredBean = bean; - if (DATA_SIZE_PATTERN.matcher(thresholdValue).matches()) { - final long bytes = DataUnit.parseDataSize(thresholdValue, DataUnit.B).longValue(); - if (bean.isCollectionUsageThresholdSupported()) { - bean.setCollectionUsageThreshold(bytes); + monitoredBean = memoryPoolBean; + if (memoryPoolBean.isCollectionUsageThresholdSupported()) { + long calculatedThreshold; + if (DATA_SIZE_PATTERN.matcher(thresholdValue).matches()) { + calculatedThreshold = DataUnit.parseDataSize(thresholdValue, DataUnit.B).longValue(); + } else { + final String percentage = thresholdValue.substring(0, thresholdValue.length() - 1); + final double pct = Double.parseDouble(percentage) / 100D; + calculatedThreshold = (long) (monitoredBean.getUsage().getMax() * pct); } - } else { - final String percentage = thresholdValue.substring(0, thresholdValue.length() - 1); - final double pct = Double.parseDouble(percentage) / 100D; - final long calculatedThreshold = (long) (bean.getUsage().getMax() * pct); - if (bean.isCollectionUsageThresholdSupported()) { - bean.setCollectionUsageThreshold(calculatedThreshold); - } - } - } - } - - for (final GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { - for (final String memoryPoolName : bean.getMemoryPoolNames()) { - if (desiredMemoryPoolName.equals(memoryPoolName)) { - garbageCollectorBeans.add(bean); + monitoredBean.setUsageThreshold(calculatedThreshold); } } } @@ -187,16 +191,6 @@ public class MonitorMemory extends AbstractReportingTask { } } - private long calculateThresholdBytes(final long maxBytes) { - if (DATA_SIZE_PATTERN.matcher(threshold).matches()) { - return DataUnit.parseDataSize(threshold, DataUnit.B).longValue(); - } else { - final String percentage = threshold.substring(0, threshold.length() - 1); - final double pct = Double.parseDouble(percentage) / 100D; - return (long) (maxBytes * pct); - } - } - @Override public void onTrigger(final ReportingContext context) { final MemoryPoolMXBean bean = monitoredBean; @@ -204,17 +198,15 @@ public class MonitorMemory extends AbstractReportingTask { return; } - final MemoryUsage usage = bean.getCollectionUsage(); + final MemoryUsage usage = bean.getUsage(); if (usage == null) { logger.warn("{} could not determine memory usage for pool with name {}", this, context.getProperty(MEMORY_POOL_PROPERTY)); return; } - final boolean exceeded = usage.getUsed() > calculateThresholdBytes(usage.getMax()); final double percentageUsed = (double) usage.getUsed() / (double) usage.getMax() * 100D; - - if (exceeded) { + if (bean.isUsageThresholdExceeded()) { if (System.currentTimeMillis() < reportingIntervalMillis + lastReportTime && lastReportTime > 0L) { return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java new file mode 100644 index 0000000000..1d133e1f72 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import org.apache.commons.io.FileUtils; +import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.admin.service.KeyService; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.provenance.MockProvenanceEventRepository; +import org.apache.nifi.util.CapturingLogger; +import org.apache.nifi.util.NiFiProperties; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; + +public class MonitorMemoryTest { + + private FlowController fc; + + @Before + public void before() throws Exception { + System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); + NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); + NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); + NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); + fc = this.buildFlowControllerForTest(); + } + + @After + public void after() throws Exception { + FileUtils.deleteDirectory(new File("./target/test-repo")); + FileUtils.deleteDirectory(new File("./target/content_repository")); + fc.shutdown(true); + } + + @Test(expected = IllegalStateException.class) + public void validatevalidationKicksInOnWrongPoolNames() throws Exception { + ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName()); + reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo"); + ProcessScheduler ps = fc.getProcessScheduler(); + ps.schedule(reportingTask); + } + + @Test + public void validateWarnWhenPercentThresholdReached() throws Exception { + this.doValidate("10%"); + } + + /* + * We're ignoring this tests as it is practically impossible to run it + * reliably together with automated Maven build since we can't control the + * state of the JVM on each machine during the build. However, you can run + * it selectively for further validation + */ + @Test + @Ignore + public void validateWarnWhenSizeThresholdReached() throws Exception { + this.doValidate("10 MB"); + } + + public void doValidate(String threshold) throws Exception { + CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger(); + ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName()); + reportingTask.setScheduldingPeriod("1 sec"); + reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen"); + reportingTask.setProperty(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis"); + reportingTask.setProperty(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold); + + ProcessScheduler ps = fc.getProcessScheduler(); + ps.schedule(reportingTask); + + Thread.sleep(2000); + // ensure no memory warning were issued + assertTrue(capturingLogger.getWarnMessages().size() == 0); + + // throw something on the heap + @SuppressWarnings("unused") + byte[] b = new byte[Integer.MAX_VALUE / 3]; + Thread.sleep(200); + assertTrue(capturingLogger.getWarnMessages().size() > 0); + assertTrue(capturingLogger.getWarnMessages().get(0).getMsg() + .startsWith("Memory Pool 'PS Old Gen' has exceeded the configured Threshold of " + threshold)); + + // now try to clear the heap and see memory being reclaimed + b = null; + System.gc(); + Thread.sleep(1000); + assertTrue(capturingLogger.getInfoMessages().get(0).getMsg().startsWith( + "Memory Pool 'PS Old Gen' is no longer exceeding the configured Threshold of " + threshold)); + } + + private CapturingLogger wrapAndReturnCapturingLogger() throws Exception { + Field loggerField = MonitorMemory.class.getDeclaredField("logger"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(loggerField, loggerField.getModifiers() & ~Modifier.FINAL); + + loggerField.setAccessible(true); + CapturingLogger capturingLogger = new CapturingLogger((Logger) loggerField.get(null)); + loggerField.set(null, capturingLogger); + return capturingLogger; + } + + private FlowController buildFlowControllerForTest() throws Exception { + NiFiProperties properties = NiFiProperties.getInstance(); + + properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, + MockProvenanceEventRepository.class.getName()); + properties.setProperty("nifi.remote.input.socket.port", ""); + properties.setProperty("nifi.remote.input.secure", ""); + + return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties, + mock(KeyService.class), mock(AuditService.class), null, null); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/nifi.properties b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/nifi.properties new file mode 100644 index 0000000000..5344646b72 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/nifi.properties @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Core Properties # +nifi.version=nifi-test 0.* +nifi.flow.configuration.file=./target/flow.xml.gz +nifi.flow.configuration.archive.dir=./target/archive/ + +nifi.reporting.task.configuration.file=./target/reporting-tasks.xml +nifi.controller.service.configuration.file=./target/controller-services.xml +nifi.templates.directory=./target/templates +nifi.nar.library.directory=./target/lib +nifi.nar.working.directory=./target/work/nar/ + +# FlowFile Repository +nifi.flowfile.repository.directory=./target/test-repo + +# Content Repository +nifi.content.repository.directory.default=./target/content_repository diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/state-management.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/state-management.xml new file mode 100644 index 0000000000..1714c7dcbb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/resources/state-management.xml @@ -0,0 +1,38 @@ + + + + + + + + + local-provider + org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider + target/test-classes/access-control/state-management + + \ No newline at end of file