Merge branch 'develop' into nifi-27

This commit is contained in:
Matt Gilman 2014-12-16 09:44:24 -05:00
commit f10bb3ccad
27 changed files with 651 additions and 210 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ target
.settings .settings
.classpath .classpath
nbactions.xml nbactions.xml
nb-configuration.xml
.DS_Store .DS_Store
# Intellij # Intellij

View File

@ -27,6 +27,18 @@
<scope>runtime</scope> <scope>runtime</scope>
<outputDirectory>lib</outputDirectory> <outputDirectory>lib</outputDirectory>
<useTransitiveFiltering>true</useTransitiveFiltering> <useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<exclude>nifi-bootstrap</exclude>
</excludes>
</dependencySet>
<dependencySet>
<scope>runtime</scope>
<outputDirectory>lib/bootstrap</outputDirectory>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>nifi-bootstrap</include>
</includes>
</dependencySet> </dependencySet>
</dependencySets> </dependencySets>

View File

@ -5,7 +5,9 @@ cd misc/nar-maven-plugin && \
mvn $MAVEN_FLAGS install && \ mvn $MAVEN_FLAGS install && \
cd ../../commons/nifi-parent && \ cd ../../commons/nifi-parent && \
mvn $MAVEN_FLAGS install && \ mvn $MAVEN_FLAGS install && \
cd ../../nifi-api && \ cd ../../nifi-bootstrap && \
mvn $MAVEN_FLAGS install && \
cd ../nifi-api && \
mvn $MAVEN_FLAGS install && \ mvn $MAVEN_FLAGS install && \
cd ../commons/ && \ cd ../commons/ && \
cd nifi-stream-utils && \ cd nifi-stream-utils && \

View File

@ -42,6 +42,22 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugin-plugin</artifactId> <artifactId>maven-plugin-plugin</artifactId>
<version>3.3</version> <version>3.3</version>
<executions>
<execution>
<id>default-descriptor</id>
<goals>
<goal>descriptor</goal>
</goals>
<phase>process-classes</phase>
</execution>
<execution>
<id>help-descriptor</id>
<goals>
<goal>helpmojo</goal>
</goals>
<phase>process-classes</phase>
</execution>
</executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
@ -68,6 +84,7 @@
<groupId>org.apache.maven.plugin-tools</groupId> <groupId>org.apache.maven.plugin-tools</groupId>
<artifactId>maven-plugin-annotations</artifactId> <artifactId>maven-plugin-annotations</artifactId>
<version>3.3</version> <version>3.3</version>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<distributionManagement> <distributionManagement>

View File

@ -86,10 +86,10 @@ public class NarMojo extends AbstractMojo {
* POM * POM
* *
*/ */
@Parameter(property = "project", readonly = true, required = true) @Parameter(defaultValue = "${project}", readonly = true, required = true)
protected MavenProject project; protected MavenProject project;
@Parameter(property = "session", readonly = true, required = true) @Parameter(defaultValue = "${session}", readonly = true, required = true)
protected MavenSession session; protected MavenSession session;
/** /**

View File

@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.claim.ContentClaimManager;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.BufferedOutputStream; import org.apache.nifi.io.BufferedOutputStream;
import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
public static final int MINIMUM_SWAP_COUNT = 10000; public static final int MINIMUM_SWAP_COUNT = 10000;
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
public static final int SWAP_ENCODING_VERSION = 6; public static final int SWAP_ENCODING_VERSION = 6;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private final ScheduledExecutorService swapQueueIdentifierExecutor; private final ScheduledExecutorService swapQueueIdentifierExecutor;
private final ScheduledExecutorService swapInExecutor; private final ScheduledExecutorService swapInExecutor;
private volatile FlowFileRepository flowFileRepository; private volatile FlowFileRepository flowFileRepository;
private volatile EventReporter eventReporter;
// Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>(); private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) { public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
this.claimManager = claimManager; this.claimManager = claimManager;
this.flowFileRepository = flowFileRepository; this.flowFileRepository = flowFileRepository;
this.eventReporter = eventReporter;
swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS); swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS); swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
} }
@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
if (!swapFile.delete()) { if (!swapFile.delete()) {
logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually"); final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
logger.warn(errMsg);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e); final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
if (swapFile != null) { if (swapFile != null) {
queue.add(swapFile); queue.add(swapFile);
} }
@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} catch (final IOException ioe) { } catch (final IOException ioe) {
recordsSwapped = 0; recordsSwapped = 0;
flowFileQueue.putSwappedRecords(toSwap); flowFileQueue.putSwappedRecords(toSwap);
logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe); final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
} }
if (recordsSwapped > 0) { if (recordsSwapped > 0) {
@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final int swapEncodingVersion = in.readInt(); final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) { if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
throw new IOException(errMsg);
} }
final String connectionId = in.readUTF(); final String connectionId = in.readUTF();
final FlowFileQueue queue = queueMap.get(connectionId); final FlowFileQueue queue = queueMap.get(connectionId);
if (queue == null) { if (queue == null) {
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId); logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
continue; continue;
} }
@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
maxRecoveredId = maxId; maxRecoveredId = maxId;
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString()); final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.error("", ioe); logger.error("", ioe);
} }

View File

@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
try { try {
this.provenanceEventRepository = createProvenanceRepository(properties); this.provenanceEventRepository = createProvenanceRepository(properties);
this.provenanceEventRepository.initialize(new EventReporter() { this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
bulletinRepository.addBulletin(bulletin);
}
});
this.contentRepository = createContentRepository(properties); this.contentRepository = createContentRepository(properties);
} catch (final Exception e) { } catch (final Exception e) {
@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
} }
} }
private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
return new EventReporter() {
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
bulletinRepository.addBulletin(bulletin);
}
};
}
public void initializeFlow() throws IOException { public void initializeFlow() throws IOException {
writeLock.lock(); writeLock.lock();
try { try {
@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
contentRepository.cleanup(); contentRepository.cleanup();
if (flowFileSwapManager != null) { if (flowFileSwapManager != null) {
flowFileSwapManager.start(flowFileRepository, this, contentClaimManager); flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
} }
if (externalSiteListener != null) { if (externalSiteListener != null) {
@ -1050,6 +1054,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.shutdown(); processScheduler.shutdown();
} }
if ( contentRepository != null ) {
contentRepository.shutdown();
}
if ( provenanceEventRepository != null ) { if ( provenanceEventRepository != null ) {
try { try {
provenanceEventRepository.close(); provenanceEventRepository.close();

View File

@ -223,6 +223,12 @@ public class FileSystemRepository implements ContentRepository {
this.contentClaimManager = claimManager; this.contentClaimManager = claimManager;
} }
@Override
public void shutdown() {
executor.shutdown();
containerCleanupExecutor.shutdown();
}
private static double getRatio(final String value) { private static double getRatio(final String value) {
final String trimmed = value.trim(); final String trimmed = value.trim();
final String percentage = trimmed.substring(0, trimmed.length() - 1); final String percentage = trimmed.substring(0, trimmed.length() - 1);

View File

@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -480,7 +479,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
context.getFlowFileEventRepository().updateRepository(flowFileEvent); context.getFlowFileEventRepository().updateRepository(flowFileEvent);
for (final FlowFileEvent connectionEvent : connectionCounts.values()) { for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
context.getFlowFileEventRepository().updateRepository(connectionEvent); context.getFlowFileEventRepository().updateRepository(connectionEvent);
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -488,6 +487,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
} }
private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
Set<ProvenanceEventType> eventTypes = map.get(id);
if ( eventTypes == null ) {
eventTypes = new HashSet<>();
map.put(id, eventTypes);
}
eventTypes.add(eventType);
}
private void updateProvenanceRepo(final Checkpoint checkpoint) { private void updateProvenanceRepo(final Checkpoint checkpoint) {
// Update Provenance Repository // Update Provenance Repository
final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
@ -496,6 +505,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
// for this, so that we are able to ensure that the events are submitted in the proper order. // for this, so that we are able to ensure that the events are submitted in the proper order.
final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>(); final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>();
final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>();
final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents; final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;
@ -513,6 +523,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) {
recordsToSubmit.add(event); recordsToSubmit.add(event);
for ( final String childUuid : event.getChildUuids() ) {
addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
}
for ( final String parentUuid : event.getParentUuids() ) {
addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
}
} }
} }
@ -521,8 +538,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue; continue;
} }
if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
continue;
}
// Check if the event indicates that the FlowFile was routed to the same
// connection from which it was pulled (and only this connection). If so, discard the event.
isSpuriousRouteEvent(event, checkpoint.records);
recordsToSubmit.add(event); recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
} }
// Finally, add any other events that we may have generated. // Finally, add any other events that we may have generated.
@ -533,6 +558,68 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
recordsToSubmit.add(event); recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
}
}
// Check if content or attributes changed. If so, register the appropriate events.
for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) {
final ContentClaim original = repoRecord.getOriginalClaim();
final ContentClaim current = repoRecord.getCurrentClaim();
boolean contentChanged = false;
if ( original == null && current != null ) {
contentChanged = true;
}
if ( original != null && current == null ) {
contentChanged = true;
}
if ( original != null && current != null && !original.equals(current) ) {
contentChanged = true;
}
final FlowFileRecord curFlowFile = repoRecord.getCurrent();
final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
boolean eventAdded = false;
if (checkpoint.removedFlowFiles.contains(flowFileId)) {
continue;
}
final boolean newFlowFile = repoRecord.getOriginal() == null;
if ( contentChanged && !newFlowFile ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
eventAdded = true;
}
if ( checkpoint.createdFlowFiles.contains(flowFileId) ) {
final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
boolean creationEventRegistered = false;
if ( registeredTypes != null ) {
if ( registeredTypes.contains(ProvenanceEventType.CREATE) ||
registeredTypes.contains(ProvenanceEventType.FORK) ||
registeredTypes.contains(ProvenanceEventType.JOIN) ||
registeredTypes.contains(ProvenanceEventType.RECEIVE) ) {
creationEventRegistered = true;
}
}
if ( !creationEventRegistered ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build());
eventAdded = true;
}
}
if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) {
// We generate an ATTRIBUTES_MODIFIED event only if no other event has been
// created for the FlowFile. We do this because all events contain both the
// newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
// event is redundant if another already exists.
if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
}
} }
} }
@ -696,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return false; return false;
} }
/**
* Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile
* was routed to a relationship with only 1 connection and that Connection is the Connection from which
* the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
*
* @param event
* @param records
* @return
*/
private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) {
if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
final String relationshipName = event.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship);
// If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
// as it may be cloning the FlowFile and adding to multiple connections.
if ( connectionsForRelationship.size() == 1 ) {
for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) {
final FlowFileRecord flowFileRecord = entry.getKey();
if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) {
final StandardRepositoryRecord repoRecord = entry.getValue();
if ( repoRecord.getOriginalQueue() == null ) {
return false;
}
final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
final Connection destinationConnection = connectionsForRelationship.iterator().next();
final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
return originalQueueId.equals(destinationQueueId);
}
}
}
}
return false;
}
@Override @Override
public void rollback() { public void rollback() {
rollback(false); rollback(false);

View File

@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
} }
} }
private ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = repository.eventBuilder(); final ProvenanceEventBuilder builder = repository.eventBuilder();
builder.setEventType(eventType); builder.setEventType(eventType);
builder.fromFlowFile(flowFile); builder.fromFlowFile(flowFile);

View File

@ -138,6 +138,11 @@ public class VolatileContentRepository implements ContentRepository {
this.claimManager = claimManager; this.claimManager = claimManager;
} }
@Override
public void shutdown() {
executor.shutdown();
}
/** /**
* Specifies a Backup Repository where data should be written if this * Specifies a Backup Repository where data should be written if this
* Repository fills up * Repository fills up

View File

@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.security.cert.CertificateExpiredException; import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException; import javax.security.cert.CertificateNotYetValidException;
import javax.ws.rs.core.Response;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
@ -88,7 +89,6 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.entity.ControllerEntity; import org.apache.nifi.web.api.entity.ControllerEntity;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -110,6 +110,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status"; public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
// status codes
public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode();
public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode();
private final String id; private final String id;
private final URI targetUri; private final URI targetUri;
@ -860,7 +865,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try { try {
// perform the request // perform the request
final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS)); final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS));
if (!Status.OK.equals(response.getClientResponseStatus())) {
if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) {
writeLock.lock(); writeLock.lock();
try { try {
for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) { for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) {
@ -882,7 +888,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// consume the entity entirely // consume the entity entirely
response.getEntity(String.class); response.getEntity(String.class);
throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase()); throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase());
} }
final ControllerEntity entity = response.getEntity(ControllerEntity.class); final ControllerEntity entity = response.getEntity(ControllerEntity.class);
@ -1303,8 +1309,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try { try {
final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
switch (response.getClientResponseStatus()) {
case OK: final int statusCode = response.getStatus();
if ( statusCode == OK_STATUS_CODE ) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class); final ControllerEntity entity = response.getEntity(ControllerEntity.class);
final ControllerDTO dto = entity.getController(); final ControllerDTO dto = entity.getController();
@ -1325,15 +1333,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final String remoteInstanceId = dto.getInstanceId(); final String remoteInstanceId = dto.getInstanceId();
boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
pointsToCluster.set(isPointingToCluster); pointsToCluster.set(isPointingToCluster);
break; } else if ( statusCode == UNAUTHORIZED_STATUS_CODE ) {
case UNAUTHORIZED:
try { try {
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
if (requestAccountResponse.getClientResponseStatus() == Status.OK) { if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()) ) {
logger.info("{} Issued a Request to communicate with remote instance", this); logger.info("{} Issued a Request to communicate with remote instance", this);
} else { } else {
logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
this, requestAccountResponse.getClientResponseStatus().getStatusCode(), requestAccountResponse.getClientResponseStatus().getReasonPhrase()}); this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()});
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("{} Failed to request account due to {}", this, e.toString()); logger.error("{} Failed to request account due to {}", this, e.toString());
@ -1343,16 +1350,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} }
authorizationIssue = response.getEntity(String.class); authorizationIssue = response.getEntity(String.class);
break; } else if ( statusCode == FORBIDDEN_STATUS_CODE ) {
case FORBIDDEN:
authorizationIssue = response.getEntity(String.class); authorizationIssue = response.getEntity(String.class);
break; } else {
default:
final String message = response.getEntity(String.class); final String message = response.getEntity(String.class);
logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}",
new Object[]{this, response.getClientResponseStatus().getStatusCode(), response.getClientResponseStatus().getReasonPhrase(), message}); new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
authorizationIssue = "Unable to determine Site-to-Site availability."; authorizationIssue = "Unable to determine Site-to-Site availability.";
break;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));

View File

@ -42,6 +42,7 @@ public class ReflectionUtils {
* @throws IllegalAccessException * @throws IllegalAccessException
*/ */
public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
try {
for (final Method method : instance.getClass().getMethods()) { for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotation)) { if (method.isAnnotationPresent(annotation)) {
final boolean isAccessible = method.isAccessible(); final boolean isAccessible = method.isAccessible();
@ -80,6 +81,13 @@ public class ReflectionUtils {
} }
} }
} }
} catch (final InvocationTargetException ite) {
if ( ite.getCause() instanceof RuntimeException ) {
throw (RuntimeException) ite.getCause();
} else {
throw ite;
}
}
} }
/** /**

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -64,7 +65,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -267,7 +267,7 @@ public class TestStandardProcessSession {
} }
@Test @Test
public void testSpawnsNotEmittedIfFilesDeleted() throws IOException { public void testForksNotEmittedIfFilesDeleted() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
@ -283,8 +283,9 @@ public class TestStandardProcessSession {
assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
} }
@Test @Test
public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws IOException { public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
@ -319,6 +320,79 @@ public class TestStandardProcessSession {
assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());
} }
@Test
public void testUpdateAttributesThenJoin() throws IOException {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.entryDate(System.currentTimeMillis())
.build();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.id(2L)
.addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord1);
flowFileQueue.put(flowFileRecord2);
FlowFile ff1 = session.get();
FlowFile ff2 = session.get();
ff1 = session.putAttribute(ff1, "index", "1");
ff2 = session.putAttribute(ff2, "index", "2");
final List<FlowFile> parents = new ArrayList<>(2);
parents.add(ff1);
parents.add(ff2);
final FlowFile child = session.create(parents);
final Relationship rel = new Relationship.Builder().name("A").build();
session.transfer(ff1, rel);
session.transfer(ff2, rel);
session.transfer(child, rel);
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
// We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
assertEquals(3, events.size());
int joinCount = 0;
int ff1UpdateCount = 0;
int ff2UpdateCount = 0;
for ( final ProvenanceEventRecord event : events ) {
switch (event.getEventType()) {
case JOIN:
assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
joinCount++;
break;
case ATTRIBUTES_MODIFIED:
if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
ff1UpdateCount++;
} else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
ff2UpdateCount++;
} else {
Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
}
break;
default:
Assert.fail("Unexpected event type: " + event);
}
}
assertEquals(1, joinCount);
assertEquals(1, ff1UpdateCount);
assertEquals(1, ff2UpdateCount);
assertEquals(1, joinCount);
}
@Test @Test
public void testForkOneToOneReported() throws IOException { public void testForkOneToOneReported() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@ -696,9 +770,7 @@ public class TestStandardProcessSession {
return true; return true;
} }
}) })
.contentClaimOffset(1000L) .contentClaimOffset(1000L).size(1L).build();
.size(1L)
.build();
flowFileQueue.put(flowFileRecord2); flowFileQueue.put(flowFileRecord2);
// attempt to read the data. // attempt to read the data.
@ -759,6 +831,89 @@ public class TestStandardProcessSession {
} }
} }
@Test
public void testCreateEmitted() throws IOException {
FlowFile newFlowFile = session.create();
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
@Test
public void testContentModifiedNotEmittedForCreate() throws IOException {
FlowFile newFlowFile = session.create();
newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
@Test
public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
.build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
}
@Test
public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
.build();
this.flowFileQueue.put(flowFile);
FlowFile existingFlowFile = session.get();
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
}
private static class MockFlowFileRepository implements FlowFileRepository { private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicLong idGenerator = new AtomicLong(0L);
@ -822,6 +977,10 @@ public class TestStandardProcessSession {
private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
@Override
public void shutdown() {
}
public Set<ContentClaim> getExistingClaims() { public Set<ContentClaim> getExistingClaims() {
final Set<ContentClaim> claims = new HashSet<>(); final Set<ContentClaim> claims = new HashSet<>();

View File

@ -1,3 +1,5 @@
@echo off
rem rem
rem Licensed to the Apache Software Foundation (ASF) under one or more rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with rem contributor license agreements. See the NOTICE file distributed with
@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
rem rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=status SET BOOTSTRAP_ACTION=status
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -165,8 +165,6 @@ run() {
BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"` BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"`
fi fi
echo
echo "Classpath: $CLASSPATH"
echo echo
echo "Java home: $JAVA_HOME" echo "Java home: $JAVA_HOME"
echo "NiFi home: $NIFI_HOME" echo "NiFi home: $NIFI_HOME"
@ -174,7 +172,7 @@ run() {
echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo "Bootstrap Config File: $BOOTSTRAP_CONF"
echo echo
exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
} }
main() { main() {

View File

@ -1,3 +1,4 @@
@echo off
rem rem
rem Licensed to the Apache Software Foundation (ASF) under one or more rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with rem contributor license agreements. See the NOTICE file distributed with
@ -15,18 +16,17 @@ rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
rem rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=run SET BOOTSTRAP_ACTION=run
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -1,3 +1,5 @@
@echo off
rem rem
rem Licensed to the Apache Software Foundation (ASF) under one or more rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with rem contributor license agreements. See the NOTICE file distributed with
@ -15,18 +17,17 @@ rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
rem rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=start SET BOOTSTRAP_ACTION=start
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -1,3 +1,5 @@
@echo off
rem rem
rem Licensed to the Apache Software Foundation (ASF) under one or more rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with rem contributor license agreements. See the NOTICE file distributed with
@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
rem rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=stop SET BOOTSTRAP_ACTION=stop
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -1,18 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project-shared-configuration>
<!--
This file contains additional configuration written by modules in the NetBeans IDE.
The configuration is intended to be shared among all the users of project and
therefore it is assumed to be part of version control checkout.
Without this configuration present, some functionality in the IDE may be limited or fail altogether.
-->
<properties xmlns="http://www.netbeans.org/ns/maven-properties-data/1">
<!--
Properties that influence various parts of the IDE, especially code formatting and the like.
You can copy and paste the single properties, into the pom.xml file and the IDE will pick them up.
That way multiple projects can share the same settings (useful for formatting rules for example).
Any value defined here will override the pom.xml file value but is only applicable to the current project.
-->
<org-netbeans-modules-maven-jaxws.rest_2e_config_2e_type>ide</org-netbeans-modules-maven-jaxws.rest_2e_config_2e_type>
</properties>
</project-shared-configuration>

View File

@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.xml.namespace.QName; import javax.xml.namespace.QName;
import javax.xml.transform.ErrorListener;
import javax.xml.transform.OutputKeys; import javax.xml.transform.OutputKeys;
import javax.xml.transform.Source; import javax.xml.transform.Source;
import javax.xml.transform.Transformer; import javax.xml.transform.Transformer;
@ -50,6 +51,7 @@ import javax.xml.xpath.XPathFactoryConfigurationException;
import net.sf.saxon.lib.NamespaceConstant; import net.sf.saxon.lib.NamespaceConstant;
import net.sf.saxon.xpath.XPathEvaluator; import net.sf.saxon.xpath.XPathEvaluator;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -73,7 +75,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ObjectHolder;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
@EventDriven @EventDriven
@ -356,8 +357,7 @@ public class EvaluateXPath extends AbstractProcessor {
session.getProvenanceReporter().modifyContent(flowFile); session.getProvenanceReporter().modifyContent(flowFile);
} }
} else { } else {
logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{ logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()});
flowFile, error.get()});
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} }
} }
@ -377,7 +377,32 @@ public class EvaluateXPath extends AbstractProcessor {
props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
transformer.setOutputProperties(props); transformer.setOutputProperties(props);
final ProcessorLog logger = getLogger();
final ObjectHolder<TransformerException> error = new ObjectHolder<>(null);
transformer.setErrorListener(new ErrorListener() {
@Override
public void warning(final TransformerException exception) throws TransformerException {
logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
}
@Override
public void error(final TransformerException exception) throws TransformerException {
logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception});
error.set(exception);
}
@Override
public void fatalError(final TransformerException exception) throws TransformerException {
logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
error.set(exception);
}
});
transformer.transform(sourceNode, new StreamResult(out)); transformer.transform(sourceNode, new StreamResult(out));
if ( error.get() != null ) {
throw error.get();
}
} }
private static class XPathValidator implements Validator { private static class XPathValidator implements Validator {

View File

@ -55,7 +55,7 @@ public class TestScanContent {
Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE); Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
final TestRunner runner = TestRunners.newTestRunner(new ScanContent()); final TestRunner runner = TestRunners.newTestRunner(new ScanContent());
runner.setThreadCount(3); runner.setThreadCount(1);
runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString()); runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString());
runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING); runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING);

View File

@ -169,6 +169,8 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
queryExecService.shutdownNow();
scheduledExecService.shutdown();
} }
@Override @Override

View File

@ -425,7 +425,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
throw new IllegalStateException("Must specify a name"); throw new IllegalStateException("Must specify a name");
} }
if (!isValueAllowed(defaultValue)) { if (!isValueAllowed(defaultValue)) {
throw new IllegalStateException("Default value is not in the set of allowable values"); throw new IllegalStateException("Default value ["+ defaultValue +"] is not in the set of allowable values");
} }
return new PropertyDescriptor(this); return new PropertyDescriptor(this);

View File

@ -45,6 +45,12 @@ public interface ContentRepository {
*/ */
void initialize(ContentClaimManager claimManager) throws IOException; void initialize(ContentClaimManager claimManager) throws IOException;
/**
* Shuts down the Content Repository, freeing any resources that may be held.
* This is called when an administrator shuts down NiFi.
*/
void shutdown();
/** /**
* Returns the names of all Containers that exist for this Content * Returns the names of all Containers that exist for this Content
* Repository * Repository

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.claim.ContentClaimManager;
import org.apache.nifi.events.EventReporter;
/** /**
* Defines a mechanism by which FlowFiles can be move into external storage or * Defines a mechanism by which FlowFiles can be move into external storage or
@ -34,8 +35,10 @@ public interface FlowFileSwapManager {
* can be obtained and restored * can be obtained and restored
* @param claimManager the ContentClaimManager to use for interacting with * @param claimManager the ContentClaimManager to use for interacting with
* Content Claims * Content Claims
* @param reporter the EventReporter that can be used for notifying users of
* important events
*/ */
void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager); void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
/** /**
* Shuts down the manager * Shuts down the manager

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.components;
import static org.junit.Assert.assertNotNull;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/**
* Regression test for issue NIFI-49, to ensure that if a Processor's Property's Default Value is not allowed,
* the Exception thrown should indicate what the default value is
*/
public class TestPropertyDescriptor {
private static Builder invalidDescriptorBuilder;
private static Builder validDescriptorBuilder;
private static String DEFAULT_VALUE = "Default Value";
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setUp() {
validDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue("Allowable Value");
invalidDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue(DEFAULT_VALUE);
}
@Test
public void testExceptionThrownByDescriptorWithInvalidDefaultValue() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("["+ DEFAULT_VALUE +"]");
invalidDescriptorBuilder.build();
}
@Test
public void testNoExceptionThrownByPropertyDescriptorWithValidDefaultValue() {
assertNotNull(validDescriptorBuilder.build());
}
}