NIFI-1690 Changed MonitorMemory to use allowable values for pool names

- removed dead code from MonitorMemory
- added MonitorMemoryTest
- minor refactoring in MonitorMemory
- initial fix for NIFI-1731 (WARN/INFO logging) that was required by MonitorMemoryTest

NIFI-1690 polishing

NIFI-1690 address PR comments, removed default value for MEMORY_POOL_PROPERTY

NIFI-1690 addressed latest PR comments

NIFI-1690 fixed breaking changes

Fixed checkstyle issue in StandardProcessScheduler. (+2 squashed commits)
Squashed commits:
[03829c4] Fixed checkstyle issues.
[cb20fe6]

This closes #328.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-04-05 14:24:46 -04:00 committed by Andy LoPresto
parent 2ce785766d
commit 8e4a4532df
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
10 changed files with 317 additions and 110 deletions

View File

@ -6,6 +6,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.Marker;
import org.slf4j.helpers.MessageFormatter;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@ -215,25 +216,23 @@ public class CapturingLogger implements Logger {
@Override
public void info(String msg) {
infoMessages.add(new LogMessage(null, msg, null));
logger.info(msg);
this.info(msg, (Object) null);
}
@Override
public void info(String format, Object arg) {
infoMessages.add(new LogMessage(null, format, null, arg));
logger.info(format, arg);
this.info(format, arg, null);
}
@Override
public void info(String format, Object arg1, Object arg2) {
infoMessages.add(new LogMessage(null, format, null, arg1, arg2));
logger.info(format, arg1, arg2);
this.info(format, new Object[] { arg1, arg2 });
}
@Override
public void info(String format, Object... arguments) {
infoMessages.add(new LogMessage(null, format, null, arguments));
String message = MessageFormatter.arrayFormat(format, arguments).getMessage();
infoMessages.add(new LogMessage(null, message, null, arguments));
logger.info(format, arguments);
}
@ -287,25 +286,23 @@ public class CapturingLogger implements Logger {
@Override
public void warn(String msg) {
warnMessages.add(new LogMessage(null, msg, null));
logger.warn(msg);
this.warn(msg, (Object) null);
}
@Override
public void warn(String format, Object arg) {
warnMessages.add(new LogMessage(null, format, null, arg));
logger.warn(format, arg);
this.warn(format, arg, null);
}
@Override
public void warn(String format, Object arg1, Object arg2) {
warnMessages.add(new LogMessage(null, format, null, arg1, arg2));
logger.warn(format, arg1, arg2);
this.warn(format, new Object[] { arg1, arg2 });
}
@Override
public void warn(String format, Object... arguments) {
warnMessages.add(new LogMessage(null, format, null, arguments));
String message = MessageFormatter.arrayFormat(format, arguments).getMessage();
warnMessages.add(new LogMessage(null, message, null, arguments));
logger.warn(format, arguments);
}

View File

@ -42,4 +42,9 @@ public class LogMessage {
public Object[] getArgs() {
return args;
}
@Override
public String toString() {
return this.msg;
}
}

View File

@ -43,7 +43,6 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
@ -203,7 +202,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
agent.schedule(taskNode, scheduleState);
@ -254,7 +253,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
try {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
@ -274,7 +273,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
agent.unschedule(taskNode, scheduleState);
if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
}
}
}

View File

@ -241,7 +241,7 @@ public abstract class ApplicationResource {
/**
* Generates a 401 Not Authorized response with no content.
*
* @return The response to be built
*/
protected ResponseBuilder generateNotAuthorizedResponse() {
@ -549,7 +549,6 @@ public abstract class ApplicationResource {
* @param method the HTTP method
* @param nodeUuid the UUID of the node to replicate the request to
* @return the response from the node
*
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
*/
protected Response replicate(final String method, final String nodeUuid) {
@ -563,7 +562,6 @@ public abstract class ApplicationResource {
* @param entity the Entity to replicate
* @param nodeUuid the UUID of the node to replicate the request to
* @return the response from the node
*
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
*/
protected Response replicate(final String method, final Object entity, final String nodeUuid) {
@ -577,7 +575,6 @@ public abstract class ApplicationResource {
* @param entity the Entity to replicate
* @param nodeUuid the UUID of the node to replicate the request to
* @return the response from the node
*
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
*/
protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {

View File

@ -511,7 +511,6 @@ public class EncryptContent extends AbstractProcessor {
} catch (final ProcessException e) {
logger.error("Cannot {}crypt {} - ", new Object[]{encrypt ? "en" : "de", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
}

View File

@ -56,5 +56,16 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -16,11 +16,11 @@
*/
package org.apache.nifi.controller;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -28,6 +28,7 @@ import java.util.regex.Pattern;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -91,15 +92,26 @@ import org.slf4j.LoggerFactory;
+ " that the memory pool is exceeding this threshold.")
public class MonitorMemory extends AbstractReportingTask {
private static final AllowableValue[] memPoolAllowableValues;
static {
List<MemoryPoolMXBean> memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
memPoolAllowableValues = new AllowableValue[memoryPoolBeans.size()];
for (int i = 0; i < memPoolAllowableValues.length; i++) {
memPoolAllowableValues[i] = new AllowableValue(memoryPoolBeans.get(i).getName());
}
}
public static final PropertyDescriptor MEMORY_POOL_PROPERTY = new PropertyDescriptor.Builder()
.name("Memory Pool")
.displayName("Memory Pool")
.description("The name of the JVM Memory Pool to monitor")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue(null)
.allowableValues(memPoolAllowableValues)
.build();
public static final PropertyDescriptor THRESHOLD_PROPERTY = new PropertyDescriptor.Builder()
.name("Usage Threshold")
.displayName("Usage Threshold")
.description("Indicates the threshold at which warnings should be generated")
.required(true)
.addValidator(new ThresholdValidator())
@ -107,6 +119,7 @@ public class MonitorMemory extends AbstractReportingTask {
.build();
public static final PropertyDescriptor REPORTING_INTERVAL = new PropertyDescriptor.Builder()
.name("Reporting Interval")
.displayName("Reporting Interval")
.description("Indicates how often this reporting task should report bulletins while the memory utilization exceeds the configured threshold")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@ -121,22 +134,23 @@ public class MonitorMemory extends AbstractReportingTask {
private volatile MemoryPoolMXBean monitoredBean;
private volatile String threshold = "65%";
private volatile long lastReportTime = 0L;
private volatile long lastReportTime;
private volatile long reportingIntervalMillis;
private volatile boolean lastValueWasExceeded = false;
private volatile boolean lastValueWasExceeded;
private final List<GarbageCollectorMXBean> garbageCollectorBeans = new ArrayList<>();
private final static List<PropertyDescriptor> propertyDescriptors;
public MonitorMemory() {
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(MEMORY_POOL_PROPERTY);
_propertyDescriptors.add(THRESHOLD_PROPERTY);
_propertyDescriptors.add(REPORTING_INTERVAL);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(3);
descriptors.add(MEMORY_POOL_PROPERTY);
descriptors.add(THRESHOLD_PROPERTY);
descriptors.add(REPORTING_INTERVAL);
return descriptors;
return propertyDescriptors;
}
@OnScheduled
@ -145,7 +159,6 @@ public class MonitorMemory extends AbstractReportingTask {
final String thresholdValue = config.getProperty(THRESHOLD_PROPERTY).getValue().trim();
threshold = thresholdValue;
// validate reporting interval
final Long reportingIntervalValue = config.getProperty(REPORTING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
if (reportingIntervalValue == null) {
reportingIntervalMillis = config.getSchedulingPeriod(TimeUnit.MILLISECONDS);
@ -154,30 +167,21 @@ public class MonitorMemory extends AbstractReportingTask {
}
final List<MemoryPoolMXBean> memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
for (final MemoryPoolMXBean bean : memoryPoolBeans) {
final String memoryPoolName = bean.getName();
for (int i = 0; i < memoryPoolBeans.size() && monitoredBean == null; i++) {
MemoryPoolMXBean memoryPoolBean = memoryPoolBeans.get(i);
String memoryPoolName = memoryPoolBean.getName();
if (desiredMemoryPoolName.equals(memoryPoolName)) {
monitoredBean = bean;
monitoredBean = memoryPoolBean;
if (memoryPoolBean.isCollectionUsageThresholdSupported()) {
long calculatedThreshold;
if (DATA_SIZE_PATTERN.matcher(thresholdValue).matches()) {
final long bytes = DataUnit.parseDataSize(thresholdValue, DataUnit.B).longValue();
if (bean.isCollectionUsageThresholdSupported()) {
bean.setCollectionUsageThreshold(bytes);
}
calculatedThreshold = DataUnit.parseDataSize(thresholdValue, DataUnit.B).longValue();
} else {
final String percentage = thresholdValue.substring(0, thresholdValue.length() - 1);
final double pct = Double.parseDouble(percentage) / 100D;
final long calculatedThreshold = (long) (bean.getUsage().getMax() * pct);
if (bean.isCollectionUsageThresholdSupported()) {
bean.setCollectionUsageThreshold(calculatedThreshold);
calculatedThreshold = (long) (monitoredBean.getUsage().getMax() * pct);
}
}
}
}
for (final GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) {
for (final String memoryPoolName : bean.getMemoryPoolNames()) {
if (desiredMemoryPoolName.equals(memoryPoolName)) {
garbageCollectorBeans.add(bean);
monitoredBean.setUsageThreshold(calculatedThreshold);
}
}
}
@ -187,16 +191,6 @@ public class MonitorMemory extends AbstractReportingTask {
}
}
private long calculateThresholdBytes(final long maxBytes) {
if (DATA_SIZE_PATTERN.matcher(threshold).matches()) {
return DataUnit.parseDataSize(threshold, DataUnit.B).longValue();
} else {
final String percentage = threshold.substring(0, threshold.length() - 1);
final double pct = Double.parseDouble(percentage) / 100D;
return (long) (maxBytes * pct);
}
}
@Override
public void onTrigger(final ReportingContext context) {
final MemoryPoolMXBean bean = monitoredBean;
@ -204,17 +198,15 @@ public class MonitorMemory extends AbstractReportingTask {
return;
}
final MemoryUsage usage = bean.getCollectionUsage();
final MemoryUsage usage = bean.getUsage();
if (usage == null) {
logger.warn("{} could not determine memory usage for pool with name {}", this,
context.getProperty(MEMORY_POOL_PROPERTY));
return;
}
final boolean exceeded = usage.getUsed() > calculateThresholdBytes(usage.getMax());
final double percentageUsed = (double) usage.getUsed() / (double) usage.getMax() * 100D;
if (exceeded) {
if (bean.isUsageThresholdExceeded()) {
if (System.currentTimeMillis() < reportingIntervalMillis + lastReportTime && lastReportTime > 0L) {
return;
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.KeyService;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.util.CapturingLogger;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
public class MonitorMemoryTest {
private FlowController fc;
@Before
public void before() throws Exception {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
fc = this.buildFlowControllerForTest();
}
@After
public void after() throws Exception {
FileUtils.deleteDirectory(new File("./target/test-repo"));
FileUtils.deleteDirectory(new File("./target/content_repository"));
fc.shutdown(true);
}
@Test(expected = IllegalStateException.class)
public void validatevalidationKicksInOnWrongPoolNames() throws Exception {
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName());
reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
ProcessScheduler ps = fc.getProcessScheduler();
ps.schedule(reportingTask);
}
@Test
public void validateWarnWhenPercentThresholdReached() throws Exception {
this.doValidate("10%");
}
/*
* We're ignoring this tests as it is practically impossible to run it
* reliably together with automated Maven build since we can't control the
* state of the JVM on each machine during the build. However, you can run
* it selectively for further validation
*/
@Test
@Ignore
public void validateWarnWhenSizeThresholdReached() throws Exception {
this.doValidate("10 MB");
}
public void doValidate(String threshold) throws Exception {
CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger();
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName());
reportingTask.setScheduldingPeriod("1 sec");
reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
reportingTask.setProperty(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
reportingTask.setProperty(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
ProcessScheduler ps = fc.getProcessScheduler();
ps.schedule(reportingTask);
Thread.sleep(2000);
// ensure no memory warning were issued
assertTrue(capturingLogger.getWarnMessages().size() == 0);
// throw something on the heap
@SuppressWarnings("unused")
byte[] b = new byte[Integer.MAX_VALUE / 3];
Thread.sleep(200);
assertTrue(capturingLogger.getWarnMessages().size() > 0);
assertTrue(capturingLogger.getWarnMessages().get(0).getMsg()
.startsWith("Memory Pool 'PS Old Gen' has exceeded the configured Threshold of " + threshold));
// now try to clear the heap and see memory being reclaimed
b = null;
System.gc();
Thread.sleep(1000);
assertTrue(capturingLogger.getInfoMessages().get(0).getMsg().startsWith(
"Memory Pool 'PS Old Gen' is no longer exceeding the configured Threshold of " + threshold));
}
private CapturingLogger wrapAndReturnCapturingLogger() throws Exception {
Field loggerField = MonitorMemory.class.getDeclaredField("logger");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(loggerField, loggerField.getModifiers() & ~Modifier.FINAL);
loggerField.setAccessible(true);
CapturingLogger capturingLogger = new CapturingLogger((Logger) loggerField.get(null));
loggerField.set(null, capturingLogger);
return capturingLogger;
}
private FlowController buildFlowControllerForTest() throws Exception {
NiFiProperties properties = NiFiProperties.getInstance();
properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS,
MockProvenanceEventRepository.class.getName());
properties.setProperty("nifi.remote.input.socket.port", "");
properties.setProperty("nifi.remote.input.secure", "");
return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), properties,
mock(KeyService.class), mock(AuditService.class), null, null);
}
}

View File

@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.version=nifi-test 0.*
nifi.flow.configuration.file=./target/flow.xml.gz
nifi.flow.configuration.archive.dir=./target/archive/
nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
nifi.controller.service.configuration.file=./target/controller-services.xml
nifi.templates.directory=./target/templates
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo
# Content Repository
nifi.content.repository.directory.default=./target/content_repository

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
This file lists the authority providers to use when running securely. In order
to use a specific provider it must be configured here and it's identifier
must be specified in the nifi.properties file.
-->
<stateManagement>
<!--
This file provides a mechanism for defining and configuring the State Providers
that should be used for storing state locally and across a NiFi cluster.
-->
<!--
State Provider that stores state locally in a configurable directory. This Provider requires the following properties:
Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
is important that the directory be copied over to the new version when upgrading NiFi.
-->
<local-provider>
<id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name="Directory">target/test-classes/access-control/state-management</property>
</local-provider>
</stateManagement>