mirror of https://github.com/apache/nifi.git
Merge branch 'develop' into NIFI-169
This commit is contained in:
commit
3a4c6ed887
|
@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
|
||||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||||
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
||||||
import org.apache.nifi.engine.FlowEngine;
|
import org.apache.nifi.engine.FlowEngine;
|
||||||
|
import org.apache.nifi.events.EventReporter;
|
||||||
import org.apache.nifi.io.BufferedOutputStream;
|
import org.apache.nifi.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.processor.QueueSize;
|
import org.apache.nifi.processor.QueueSize;
|
||||||
|
import org.apache.nifi.reporting.Severity;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
public static final int MINIMUM_SWAP_COUNT = 10000;
|
public static final int MINIMUM_SWAP_COUNT = 10000;
|
||||||
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
|
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
|
||||||
public static final int SWAP_ENCODING_VERSION = 6;
|
public static final int SWAP_ENCODING_VERSION = 6;
|
||||||
|
public static final String EVENT_CATEGORY = "Swap FlowFiles";
|
||||||
|
|
||||||
private final ScheduledExecutorService swapQueueIdentifierExecutor;
|
private final ScheduledExecutorService swapQueueIdentifierExecutor;
|
||||||
private final ScheduledExecutorService swapInExecutor;
|
private final ScheduledExecutorService swapInExecutor;
|
||||||
private volatile FlowFileRepository flowFileRepository;
|
private volatile FlowFileRepository flowFileRepository;
|
||||||
|
private volatile EventReporter eventReporter;
|
||||||
|
|
||||||
// Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
|
// Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
|
||||||
private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
|
||||||
|
@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) {
|
public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
|
||||||
this.claimManager = claimManager;
|
this.claimManager = claimManager;
|
||||||
this.flowFileRepository = flowFileRepository;
|
this.flowFileRepository = flowFileRepository;
|
||||||
|
this.eventReporter = eventReporter;
|
||||||
swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
|
swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
|
||||||
swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
|
swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!swapFile.delete()) {
|
if (!swapFile.delete()) {
|
||||||
logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually");
|
final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
|
||||||
|
logger.warn(errMsg);
|
||||||
|
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e);
|
final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
|
||||||
|
logger.error(errMsg);
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||||
|
|
||||||
if (swapFile != null) {
|
if (swapFile != null) {
|
||||||
queue.add(swapFile);
|
queue.add(swapFile);
|
||||||
}
|
}
|
||||||
|
@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
recordsSwapped = 0;
|
recordsSwapped = 0;
|
||||||
flowFileQueue.putSwappedRecords(toSwap);
|
flowFileQueue.putSwappedRecords(toSwap);
|
||||||
logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe);
|
final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
|
||||||
|
logger.error(errMsg);
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recordsSwapped > 0) {
|
if (recordsSwapped > 0) {
|
||||||
|
@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
|
|
||||||
final int swapEncodingVersion = in.readInt();
|
final int swapEncodingVersion = in.readInt();
|
||||||
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
|
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
|
||||||
throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
|
final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
|
||||||
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
|
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
|
||||||
|
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||||
|
throw new IOException(errMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String connectionId = in.readUTF();
|
final String connectionId = in.readUTF();
|
||||||
final FlowFileQueue queue = queueMap.get(connectionId);
|
final FlowFileQueue queue = queueMap.get(connectionId);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
|
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
maxRecoveredId = maxId;
|
maxRecoveredId = maxId;
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString());
|
final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
|
||||||
|
logger.error(errMsg);
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.error("", ioe);
|
logger.error("", ioe);
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.provenanceEventRepository = createProvenanceRepository(properties);
|
this.provenanceEventRepository = createProvenanceRepository(properties);
|
||||||
this.provenanceEventRepository.initialize(new EventReporter() {
|
this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
|
||||||
@Override
|
|
||||||
public void reportEvent(final Severity severity, final String category, final String message) {
|
|
||||||
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
|
|
||||||
bulletinRepository.addBulletin(bulletin);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.contentRepository = createContentRepository(properties);
|
this.contentRepository = createContentRepository(properties);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
|
@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
|
||||||
|
return new EventReporter() {
|
||||||
|
@Override
|
||||||
|
public void reportEvent(final Severity severity, final String category, final String message) {
|
||||||
|
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
|
||||||
|
bulletinRepository.addBulletin(bulletin);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public void initializeFlow() throws IOException {
|
public void initializeFlow() throws IOException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
contentRepository.cleanup();
|
contentRepository.cleanup();
|
||||||
|
|
||||||
if (flowFileSwapManager != null) {
|
if (flowFileSwapManager != null) {
|
||||||
flowFileSwapManager.start(flowFileRepository, this, contentClaimManager);
|
flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (externalSiteListener != null) {
|
if (externalSiteListener != null) {
|
||||||
|
|
|
@ -42,43 +42,51 @@ public class ReflectionUtils {
|
||||||
* @throws IllegalAccessException
|
* @throws IllegalAccessException
|
||||||
*/
|
*/
|
||||||
public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
for (final Method method : instance.getClass().getMethods()) {
|
try {
|
||||||
if (method.isAnnotationPresent(annotation)) {
|
for (final Method method : instance.getClass().getMethods()) {
|
||||||
final boolean isAccessible = method.isAccessible();
|
if (method.isAnnotationPresent(annotation)) {
|
||||||
method.setAccessible(true);
|
final boolean isAccessible = method.isAccessible();
|
||||||
|
method.setAccessible(true);
|
||||||
try {
|
|
||||||
final Class<?>[] argumentTypes = method.getParameterTypes();
|
try {
|
||||||
if (argumentTypes.length > args.length) {
|
final Class<?>[] argumentTypes = method.getParameterTypes();
|
||||||
throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given",
|
if (argumentTypes.length > args.length) {
|
||||||
method.getName(), instance, argumentTypes.length, args.length));
|
throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given",
|
||||||
}
|
method.getName(), instance, argumentTypes.length, args.length));
|
||||||
|
|
||||||
for (int i = 0; i < argumentTypes.length; i++) {
|
|
||||||
final Class<?> argType = argumentTypes[i];
|
|
||||||
if (!argType.isAssignableFrom(args[i].getClass())) {
|
|
||||||
throw new IllegalArgumentException(String.format(
|
|
||||||
"Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s",
|
|
||||||
method.getName(), instance, i, argType, args[i].getClass()));
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
for (int i = 0; i < argumentTypes.length; i++) {
|
||||||
if (argumentTypes.length == args.length) {
|
final Class<?> argType = argumentTypes[i];
|
||||||
method.invoke(instance, args);
|
if (!argType.isAssignableFrom(args[i].getClass())) {
|
||||||
} else {
|
throw new IllegalArgumentException(String.format(
|
||||||
final Object[] argsToPass = new Object[argumentTypes.length];
|
"Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s",
|
||||||
for (int i = 0; i < argsToPass.length; i++) {
|
method.getName(), instance, i, argType, args[i].getClass()));
|
||||||
argsToPass[i] = args[i];
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argumentTypes.length == args.length) {
|
||||||
|
method.invoke(instance, args);
|
||||||
|
} else {
|
||||||
|
final Object[] argsToPass = new Object[argumentTypes.length];
|
||||||
|
for (int i = 0; i < argsToPass.length; i++) {
|
||||||
|
argsToPass[i] = args[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
method.invoke(instance, argsToPass);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (!isAccessible) {
|
||||||
|
method.setAccessible(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
method.invoke(instance, argsToPass);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (!isAccessible) {
|
|
||||||
method.setAccessible(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (final InvocationTargetException ite) {
|
||||||
|
if ( ite.getCause() instanceof RuntimeException ) {
|
||||||
|
throw (RuntimeException) ite.getCause();
|
||||||
|
} else {
|
||||||
|
throw ite;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class TestScanContent {
|
||||||
Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new ScanContent());
|
final TestRunner runner = TestRunners.newTestRunner(new ScanContent());
|
||||||
runner.setThreadCount(3);
|
runner.setThreadCount(1);
|
||||||
runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString());
|
runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString());
|
||||||
runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING);
|
runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING);
|
||||||
|
|
||||||
|
|
|
@ -425,7 +425,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
|
||||||
throw new IllegalStateException("Must specify a name");
|
throw new IllegalStateException("Must specify a name");
|
||||||
}
|
}
|
||||||
if (!isValueAllowed(defaultValue)) {
|
if (!isValueAllowed(defaultValue)) {
|
||||||
throw new IllegalStateException("Default value is not in the set of allowable values");
|
throw new IllegalStateException("Default value ["+ defaultValue +"] is not in the set of allowable values");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new PropertyDescriptor(this);
|
return new PropertyDescriptor(this);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.controller.repository;
|
package org.apache.nifi.controller.repository;
|
||||||
|
|
||||||
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
||||||
|
import org.apache.nifi.events.EventReporter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines a mechanism by which FlowFiles can be move into external storage or
|
* Defines a mechanism by which FlowFiles can be move into external storage or
|
||||||
|
@ -34,8 +35,10 @@ public interface FlowFileSwapManager {
|
||||||
* can be obtained and restored
|
* can be obtained and restored
|
||||||
* @param claimManager the ContentClaimManager to use for interacting with
|
* @param claimManager the ContentClaimManager to use for interacting with
|
||||||
* Content Claims
|
* Content Claims
|
||||||
|
* @param reporter the EventReporter that can be used for notifying users of
|
||||||
|
* important events
|
||||||
*/
|
*/
|
||||||
void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager);
|
void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shuts down the manager
|
* Shuts down the manager
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.components;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for issue NIFI-49, to ensure that if a Processor's Property's Default Value is not allowed,
|
||||||
|
* the Exception thrown should indicate what the default value is
|
||||||
|
*/
|
||||||
|
public class TestPropertyDescriptor {
|
||||||
|
|
||||||
|
private static Builder invalidDescriptorBuilder;
|
||||||
|
private static Builder validDescriptorBuilder;
|
||||||
|
private static String DEFAULT_VALUE = "Default Value";
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() {
|
||||||
|
validDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue("Allowable Value");
|
||||||
|
invalidDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue(DEFAULT_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExceptionThrownByDescriptorWithInvalidDefaultValue() {
|
||||||
|
thrown.expect(IllegalStateException.class);
|
||||||
|
thrown.expectMessage("["+ DEFAULT_VALUE +"]");
|
||||||
|
|
||||||
|
invalidDescriptorBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoExceptionThrownByPropertyDescriptorWithValidDefaultValue() {
|
||||||
|
assertNotNull(validDescriptorBuilder.build());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue