This commit is contained in:
Mark Payne 2015-01-09 08:20:07 -05:00
commit 3b16d010c0
22 changed files with 354 additions and 186 deletions

View File

@ -185,12 +185,12 @@ public class RemoteProcessGroupUtils {
private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
final ClientResponse response = get(uri, timeoutMillis); final ClientResponse response = get(uri, timeoutMillis);
if (Status.OK.equals(response.getClientResponseStatus())) { if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class); final ControllerEntity entity = response.getEntity(ControllerEntity.class);
return entity.getController(); return entity.getController();
} else { } else {
final String responseMessage = response.getEntity(String.class); final String responseMessage = response.getEntity(String.class);
throw new IOException("Got HTTP response Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase() + " with explanation: " + responseMessage); throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
} }
} }

View File

@ -86,7 +86,8 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>
<argLine>-Xms512m -Xmx512m</argLine> <argLine>-Xms512m -Xmx512m</argLine>
<forkMode>always</forkMode> <forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>

View File

@ -113,6 +113,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService; import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.DownloadAuthorization; import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.processor.DataUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.security.access.AccessDeniedException; import org.springframework.security.access.AccessDeniedException;
@ -1255,6 +1256,28 @@ public class ControllerFacade implements ControllerServiceProvider {
addIfAppropriate(searchStr, comparator.getClass().getName(), "Prioritizer", matches); addIfAppropriate(searchStr, comparator.getClass().getName(), "Prioritizer", matches);
} }
// search expiration
if (StringUtils.containsIgnoreCase("expires", searchStr) || StringUtils.containsIgnoreCase("expiration", searchStr)) {
final int expirationMillis = connection.getFlowFileQueue().getFlowFileExpiration(TimeUnit.MILLISECONDS);
if (expirationMillis > 0) {
matches.add("FlowFile expiration: " + connection.getFlowFileQueue().getFlowFileExpiration());
}
}
// search back pressure
if (StringUtils.containsIgnoreCase("back pressure", searchStr) || StringUtils.containsIgnoreCase("pressure", searchStr)) {
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final Double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
if (backPressureBytes > 0) {
matches.add("Back pressure data size: " + backPressureDataSize);
}
final long backPressureCount = connection.getFlowFileQueue().getBackPressureObjectThreshold();
if (backPressureCount > 0) {
matches.add("Back pressure count: " + backPressureCount);
}
}
// search the source // search the source
final Connectable source = connection.getSource(); final Connectable source = connection.getSource();
addIfAppropriate(searchStr, source.getIdentifier(), "Source id", matches); addIfAppropriate(searchStr, source.getIdentifier(), "Source id", matches);

View File

@ -78,7 +78,7 @@ public class NiFiWebApiTest {
ClientResponse response = dfm.testPost(baseUrl + "/controller/process-groups/root/processors", processorEntity); ClientResponse response = dfm.testPost(baseUrl + "/controller/process-groups/root/processors", processorEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -107,7 +107,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/processors", processorEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/processors", processorEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -149,7 +149,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/connections", connectionEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/connections", connectionEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -172,7 +172,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/labels", labelEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/labels", labelEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -195,7 +195,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/process-group-references", processGroupEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/process-group-references", processGroupEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -218,7 +218,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/input-ports", inputPortEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/input-ports", inputPortEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -241,7 +241,7 @@ public class NiFiWebApiTest {
response = dfm.testPost(baseUrl + "/controller/process-groups/root/output-ports", outputPortEntity); response = dfm.testPost(baseUrl + "/controller/process-groups/root/output-ports", outputPortEntity);
// ensure a successful response // ensure a successful response
if (!Status.CREATED.equals(response.getClientResponseStatus())) { if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// since it was unable to create the component attempt to extract an // since it was unable to create the component attempt to extract an
// error message from the response body // error message from the response body
final String responseEntity = response.getEntity(String.class); final String responseEntity = response.getEntity(String.class);
@ -266,7 +266,7 @@ public class NiFiWebApiTest {
// response = dfm.testPost(baseUrl + "/controller/process-groups/root/remote-process-groups", remoteProcessGroupEntity); // response = dfm.testPost(baseUrl + "/controller/process-groups/root/remote-process-groups", remoteProcessGroupEntity);
// //
// // ensure a successful response // // ensure a successful response
// if (!Status.CREATED.equals(response.getClientResponseStatus())) { // if (Status.CREATED.getStatusCode() != response.getStatusInfo().getStatusCode()) {
// throw new Exception("Unable to populate initial flow."); // throw new Exception("Unable to populate initial flow.");
// } // }
} }

View File

@ -20,7 +20,6 @@ import com.sun.jersey.api.client.ClientResponse;
import java.io.File; import java.io.File;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import junit.framework.Assert;
import org.apache.nifi.integration.NiFiWebApiTest; import org.apache.nifi.integration.NiFiWebApiTest;
import org.apache.nifi.integration.util.NiFiTestServer; import org.apache.nifi.integration.util.NiFiTestServer;
import org.apache.nifi.integration.util.NiFiTestUser; import org.apache.nifi.integration.util.NiFiTestUser;
@ -52,6 +51,7 @@ import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UsersEntity; import org.apache.nifi.web.api.entity.UsersEntity;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;

View File

@ -20,7 +20,6 @@ import com.sun.jersey.api.client.ClientResponse;
import java.io.File; import java.io.File;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import junit.framework.Assert;
import org.apache.nifi.integration.NiFiWebApiTest; import org.apache.nifi.integration.NiFiWebApiTest;
import org.apache.nifi.integration.util.NiFiTestServer; import org.apache.nifi.integration.util.NiFiTestServer;
import org.apache.nifi.integration.util.NiFiTestUser; import org.apache.nifi.integration.util.NiFiTestUser;
@ -48,6 +47,7 @@ import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorTypesEntity; import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;

View File

@ -81,6 +81,8 @@ public class ConvertSvg extends HttpServlet {
String filename = request.getParameter("filename"); String filename = request.getParameter("filename");
if (filename == null) { if (filename == null) {
filename = "image.png"; filename = "image.png";
} else if (!filename.endsWith(".png")) {
filename += ".png";
} }
final StringReader reader = new StringReader(svg); final StringReader reader = new StringReader(svg);

View File

@ -35,7 +35,7 @@
</div> </div>
<div class="setting"> <div class="setting">
<div class="setting-name"> <div class="setting-name">
File expiration FlowFile expiration
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The maximum amount of time an object may be in the flow before it will be automatically aged out of the flow."/> <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The maximum amount of time an object may be in the flow before it will be automatically aged out of the flow."/>
</div> </div>
<div class="setting-field"> <div class="setting-field">

View File

@ -75,7 +75,7 @@
</div> </div>
<div class="setting"> <div class="setting">
<div class="setting-name"> <div class="setting-name">
File expiration FlowFile expiration
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The maximum amount of time an object may be in the flow before it will be automatically aged out of the flow."/> <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The maximum amount of time an object may be in the flow before it will be automatically aged out of the flow."/>
</div> </div>
<div class="setting-field"> <div class="setting-field">

View File

@ -312,6 +312,28 @@ nf.Canvas = (function () {
'stop-color': '#ffffff' 'stop-color': '#ffffff'
}); });
// define the gradient for the expiration icon
var expirationBackground = defs.append('linearGradient')
.attr({
'id': 'expiration',
'x1': '0%',
'y1': '0%',
'x2': '0%',
'y2': '100%'
});
expirationBackground.append('stop')
.attr({
'offset': '0%',
'stop-color': '#aeafb1'
});
expirationBackground.append('stop')
.attr({
'offset': '100%',
'stop-color': '#87888a'
});
// create the canvas element // create the canvas element
canvas = svg.append('g') canvas = svg.append('g')
.attr({ .attr({
@ -802,17 +824,21 @@ nf.Canvas = (function () {
}; };
return { return {
CANVAS_OFFSET: 0, CANVAS_OFFSET: 0,
/** /**
* Determines if the current broswer supports SVG. * Determines if the current broswer supports SVG.
*/ */
SUPPORTS_SVG: !!document.createElementNS && !!document.createElementNS('http://www.w3.org/2000/svg', 'svg').createSVGRect, SUPPORTS_SVG: !!document.createElementNS && !!document.createElementNS('http://www.w3.org/2000/svg', 'svg').createSVGRect,
/** /**
* Hides the splash that is displayed while the application is loading. * Hides the splash that is displayed while the application is loading.
*/ */
hideSplash: function () { hideSplash: function () {
$('#splash').fadeOut(); $('#splash').fadeOut();
}, },
/** /**
* Stop polling for revision. * Stop polling for revision.
*/ */
@ -820,6 +846,7 @@ nf.Canvas = (function () {
// set polling flag // set polling flag
revisionPolling = false; revisionPolling = false;
}, },
/** /**
* Remove the status poller. * Remove the status poller.
*/ */
@ -827,6 +854,7 @@ nf.Canvas = (function () {
// set polling flag // set polling flag
statusPolling = false; statusPolling = false;
}, },
/** /**
* Reloads the flow from the server based on the currently specified group id. * Reloads the flow from the server based on the currently specified group id.
* To load another group, update nf.Canvas.setGroupId and call nf.Canvas.reload. * To load another group, update nf.Canvas.setGroupId and call nf.Canvas.reload.
@ -865,6 +893,7 @@ nf.Canvas = (function () {
}); });
}).promise(); }).promise();
}, },
/** /**
* Reloads the status. * Reloads the status.
*/ */
@ -878,6 +907,7 @@ nf.Canvas = (function () {
}); });
}).promise(); }).promise();
}, },
/** /**
* Initialize NiFi. * Initialize NiFi.
*/ */
@ -999,6 +1029,7 @@ nf.Canvas = (function () {
}).fail(nf.Common.handleAjaxError); }).fail(nf.Common.handleAjaxError);
}).fail(nf.Common.handleAjaxError); }).fail(nf.Common.handleAjaxError);
}, },
/** /**
* Defines the gradient colors used to render processors. * Defines the gradient colors used to render processors.
* *
@ -1007,6 +1038,7 @@ nf.Canvas = (function () {
defineProcessorColors: function (colors) { defineProcessorColors: function (colors) {
setColors(colors, 'processor'); setColors(colors, 'processor');
}, },
/** /**
* Defines the gradient colors used to render label. * Defines the gradient colors used to render label.
* *
@ -1015,6 +1047,7 @@ nf.Canvas = (function () {
defineLabelColors: function (colors) { defineLabelColors: function (colors) {
setColors(colors, 'label'); setColors(colors, 'label');
}, },
/** /**
* Return whether this instance of NiFi is clustered. * Return whether this instance of NiFi is clustered.
* *
@ -1023,12 +1056,14 @@ nf.Canvas = (function () {
isClustered: function () { isClustered: function () {
return clustered === true; return clustered === true;
}, },
/** /**
* Returns whether site to site communications is secure. * Returns whether site to site communications is secure.
*/ */
isSecureSiteToSite: function () { isSecureSiteToSite: function () {
return secureSiteToSite; return secureSiteToSite;
}, },
/** /**
* Set the group id. * Set the group id.
* *
@ -1043,6 +1078,7 @@ nf.Canvas = (function () {
getGroupId: function () { getGroupId: function () {
return groupId; return groupId;
}, },
/** /**
* Set the group name. * Set the group name.
* *
@ -1051,12 +1087,14 @@ nf.Canvas = (function () {
setGroupName: function (gn) { setGroupName: function (gn) {
groupName = gn; groupName = gn;
}, },
/** /**
* Get the group name. * Get the group name.
*/ */
getGroupName: function () { getGroupName: function () {
return groupName; return groupName;
}, },
/** /**
* Set the parent group id. * Set the parent group id.
* *
@ -1065,13 +1103,16 @@ nf.Canvas = (function () {
setParentGroupId: function (pgi) { setParentGroupId: function (pgi) {
parentGroupId = pgi; parentGroupId = pgi;
}, },
/** /**
* Get the parent group id. * Get the parent group id.
*/ */
getParentGroupId: function () { getParentGroupId: function () {
return parentGroupId; return parentGroupId;
}, },
View: (function () { View: (function () {
/** /**
* Updates component visibility based on their proximity to the screen's viewport. * Updates component visibility based on their proximity to the screen's viewport.
*/ */

View File

@ -187,6 +187,24 @@ nf.Connection = (function () {
return terminal.groupId !== nf.Canvas.getGroupId() && (isInputPortType(terminal.type) || isOutputPortType(terminal.type)); return terminal.groupId !== nf.Canvas.getGroupId() && (isInputPortType(terminal.type) || isOutputPortType(terminal.type));
}; };
/**
* Determines whether expiration is configured for the specified connection.
*
* @param {object} connection
* @return {boolean} Whether expiration is configured
*/
var isExpirationConfigured = function (connection) {
if (nf.Common.isDefinedAndNotNull(connection.flowFileExpiration)) {
var match = connection.flowFileExpiration.match(/^(\d+).*/);
if (match !== null && match.length > 0) {
if (parseInt(match[0], 10) > 0) {
return true;
}
}
}
return false;
};
/** /**
* Sorts the specified connections according to the z index. * Sorts the specified connections according to the z index.
* *
@ -669,7 +687,7 @@ nf.Connection = (function () {
// update the label text // update the label text
connectionFrom.select('text.connection-from') connectionFrom.select('text.connection-from')
.each(function (d) { .each(function () {
var connectionFromLabel = d3.select(this); var connectionFromLabel = d3.select(this);
// reset the label name to handle any previous state // reset the label name to handle any previous state
@ -677,7 +695,7 @@ nf.Connection = (function () {
// apply ellipsis to the label as necessary // apply ellipsis to the label as necessary
nf.CanvasUtils.ellipsis(connectionFromLabel, d.component.source.name); nf.CanvasUtils.ellipsis(connectionFromLabel, d.component.source.name);
}).append('title').text(function (d) { }).append('title').text(function () {
return d.component.source.name; return d.component.source.name;
}); });
@ -821,7 +839,7 @@ nf.Connection = (function () {
// update the connection name // update the connection name
connectionName.select('text.connection-name') connectionName.select('text.connection-name')
.each(function (d) { .each(function () {
var connectionToLabel = d3.select(this); var connectionToLabel = d3.select(this);
// reset the label name to handle any previous state // reset the label name to handle any previous state
@ -829,7 +847,7 @@ nf.Connection = (function () {
// apply ellipsis to the label as necessary // apply ellipsis to the label as necessary
nf.CanvasUtils.ellipsis(connectionToLabel, connectionNameValue); nf.CanvasUtils.ellipsis(connectionToLabel, connectionNameValue);
}).append('title').text(function (d) { }).append('title').text(function () {
return connectionNameValue; return connectionNameValue;
}); });
} else { } else {
@ -866,6 +884,44 @@ nf.Connection = (function () {
'x': 46, 'x': 46,
'y': 10 'y': 10
}); });
var expiration = queued.append('g')
.attr({
'class': 'expiration-icon',
'transform': 'translate(167, 2)'
});
expiration.append('circle')
.attr({
'cx': 5,
'cy': 5,
'r': 4.75,
'stroke-width': 0.5,
'stroke': '#87888a',
'fill': 'url(#expiration)'
});
expiration.append('line')
.attr({
'x1': 6,
'y1': 5,
'x2': 3,
'y2': 4,
'stroke': '#fff',
'stroke-width': 1
});
expiration.append('line')
.attr({
'x1': 6,
'y1': 5,
'x2': 3,
'y2': 7,
'stroke': '#fff',
'stroke-width': 1
});
expiration.append('title');
} }
// update the queued vertical positioning as necessary // update the queued vertical positioning as necessary
@ -880,6 +936,15 @@ nf.Connection = (function () {
return 5 + (15 * labelCount) + 3; return 5 + (15 * labelCount) + 3;
}); });
// determine whether or not to show the expiration icon
connectionLabelContainer.select('g.expiration-icon')
.classed('hidden', function () {
return !isExpirationConfigured(d.component);
})
.select('title').text(function () {
return 'Expires FlowFiles older than ' + d.component.flowFileExpiration;
});
if (nf.Common.isDFM()) { if (nf.Common.isDFM()) {
// only support dragging the label when appropriate // only support dragging the label when appropriate
connectionLabelContainer.call(labelDrag); connectionLabelContainer.call(labelDrag);

View File

@ -489,7 +489,7 @@ nf.GoTo = (function () {
} }
// show the dialog // show the dialog
$('#connections-dialog').modal('setHeaderText', 'Downstream Connections').modal('show'); $('#connections-dialog').modal('setHeaderText', 'Upstream Connections').modal('show');
}).fail(nf.Common.handleAjaxError); }).fail(nf.Common.handleAjaxError);
}, },

View File

@ -23,10 +23,6 @@ import org.springframework.security.core.AuthenticationException;
*/ */
public class UntrustedProxyException extends AuthenticationException { public class UntrustedProxyException extends AuthenticationException {
public UntrustedProxyException(String msg, Object extraInformation) {
super(msg, extraInformation);
}
public UntrustedProxyException(String msg) { public UntrustedProxyException(String msg) {
super(msg); super(msg);
} }

View File

@ -302,7 +302,7 @@ public class OcspCertificateValidator {
final ClientResponse response = resource.header(CONTENT_TYPE_HEADER, OCSP_REQUEST_CONTENT_TYPE).post(ClientResponse.class, ocspRequest.getEncoded()); final ClientResponse response = resource.header(CONTENT_TYPE_HEADER, OCSP_REQUEST_CONTENT_TYPE).post(ClientResponse.class, ocspRequest.getEncoded());
// ensure the request was completed successfully // ensure the request was completed successfully
if (!ClientResponse.Status.OK.equals(response.getClientResponseStatus())) { if (ClientResponse.Status.OK.getStatusCode() != response.getStatusInfo().getStatusCode()) {
logger.warn(String.format("OCSP request was unsuccessful (%s).", response.getStatus())); logger.warn(String.format("OCSP request was unsuccessful (%s).", response.getStatus()));
return ocspStatus; return ocspStatus;
} }

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -25,7 +23,6 @@ import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import junit.framework.Assert;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -35,6 +32,7 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class GetHDFSTest { public class GetHDFSTest {

View File

@ -16,10 +16,8 @@
*/ */
package org.apache.nifi.prioritizer; package org.apache.nifi.prioritizer;
import org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -28,6 +26,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState; import org.apache.nifi.util.SharedSessionState;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

View File

@ -16,10 +16,8 @@
*/ */
package org.apache.nifi.prioritizer; package org.apache.nifi.prioritizer;
import org.apache.nifi.prioritizer.OldestFlowFileFirstPrioritizer;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -28,6 +26,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState; import org.apache.nifi.util.SharedSessionState;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 192 KiB

View File

@ -20,8 +20,8 @@ Apache NiFi Team <dev@nifi.incubator.apache.org>
:homepage: http://nifi.incubator.apache.org :homepage: http://nifi.incubator.apache.org
Overview Introduction
-------- ------------
Apache NiFi (Incubating) is a dataflow system based on the concepts of flow-based programming. It supports Apache NiFi (Incubating) is a dataflow system based on the concepts of flow-based programming. It supports
powerful and scalable directed graphs of data routing, transformation, and system mediation logic. NiFi has powerful and scalable directed graphs of data routing, transformation, and system mediation logic. NiFi has
a web-based user interface for design, control, feedback, and monitoring of dataflows. It is highly configurable a web-based user interface for design, control, feedback, and monitoring of dataflows. It is highly configurable
@ -144,15 +144,14 @@ image::status-bar.png["NiFi Status Bar"]
Building a DataFlow Building a DataFlow
------------------- -------------------
A DataFlow Manager (DFM) is able to build an automated dataflow using the NiFi User Interface (UI). This is accomplished A DataFlow Manager (DFM) is able to build an automated dataflow using the NiFi User Interface (UI). Simply drag components from the toolbar to the canvas, configure the components to meet specific needs, and connect
by dragging components from the toolbar to the canvas, configuring the components to meet specific needs, and connecting
the components together. the components together.
=== Adding Components to the Canvas === Adding Components to the Canvas
In the User Interface section above, we outlined the different segments of the UI and pointed out a Components Toolbar. In the User Interface section above outlined the different segments of the UI and pointed out a Components Toolbar.
Here, we will look at each of the Components in that toolbar: This section looks at each of the Components in that toolbar:
image::components.png["Components"] image::components.png["Components"]
@ -176,13 +175,13 @@ image::add-processor-with-tag-cloud.png["Add Processor with Tag Cloud"]
Clicking the `Add` button or double-clicking on a Processor Type will add the selected Processor to the canvas at the Clicking the `Add` button or double-clicking on a Processor Type will add the selected Processor to the canvas at the
location that it was dropped. location that it was dropped.
*Note*: For any component added to the graph, it is possible to select it with the mouse and move it anywhere on the graph. Also, it is possible to select multiple items at once by either holding down the Shift key and selecting each item or by holding down the Shift key and dragging a selection box around the desired components.
image:iconInputPort.png["Input Port", width=32] image:iconInputPort.png["Input Port", width=32]
*Input Port*: Input Ports provide a mechanism for transferring data into a Process Group. When an Input Port is dragged *Input Port*: Input Ports provide a mechanism for transferring data into a Process Group. When an Input Port is dragged
onto the canvas, the DFM is prompted to name the Port. All Ports within a Process Group must have unique names. onto the canvas, the DFM is prompted to name the Port. All Ports within a Process Group must have unique names.
All components exist only within a Process Group. When a user navigates to the NiFi page, the user is placed in the All components exist only within a Process Group. When a user initially navigates to the NiFi page, the user is placed in the
Root Process Group. If the Input Port is dragged onto the Root Process Group, the Input Port provides a mechanism Root Process Group. If the Input Port is dragged onto the Root Process Group, the Input Port provides a mechanism
to receive data from remote instances of NiFi. In this case, the Input Port can be configured to restrict access to to receive data from remote instances of NiFi. In this case, the Input Port can be configured to restrict access to
appropriate users. appropriate users.
@ -204,7 +203,7 @@ that data is removed from the queues of the incoming Connections.
image:iconProcessGroup.png["Process Group", width=32] image:iconProcessGroup.png["Process Group", width=32]
*Process Group*: Process Groups can be used to logically group a set of components so that the dataflow is easier to understand *Process Group*: Process Groups can be used to logically group a set of components so that the dataflow is easier to understand
and maintain. When a Process Group is dragged onto the canvas, the DFM is prompted to name the Process Group. All Process and maintain. When a Process Group is dragged onto the canvas, the DFM is prompted to name the Process Group. All Process
Groups within the same parent group must have unique names. Groups within the same parent group must have unique names. The Process Group will then be nested within that parent group.
@ -322,7 +321,7 @@ The first configuration option is the Scheduling Strategy. There are three optio
- *Timer driven*: This is the default mode. The Processor will be scheduled to run on a regular interval. The interval - *Timer driven*: This is the default mode. The Processor will be scheduled to run on a regular interval. The interval
at which the Processor is run is defined by the `Run schedule' option (see below). at which the Processor is run is defined by the `Run schedule' option (see below).
- *Event driven*: When this mode is selected, the Processor will be triggered to run by an event, and that event occurs when FlowFiles enter Connections - *Event driven*: When this mode is selected, the Processor will be triggered to run by an event, and that event occurs when FlowFiles enter Connections
that have this Processor as their destination. This mode is not supported by all Processors. When this mode is feeding this Processor. This mode is currently considered experimental and is not supported by all Processors. When this mode is
selected, the `Run schedule' option is not configurable, as the Processor is not triggered to run periodically but selected, the `Run schedule' option is not configurable, as the Processor is not triggered to run periodically but
as the result of an event. Additionally, this is the only mode for which the `Concurrent tasks' as the result of an event. Additionally, this is the only mode for which the `Concurrent tasks'
option can be set to 0. In this case, the number of threads is limited only by the size of the Event-Driven Thread Pool that option can be set to 0. In this case, the number of threads is limited only by the size of the Event-Driven Thread Pool that
@ -408,7 +407,7 @@ Note that after a User-Defined property has been added, an icon will appear on t
image:iconDelete.png["Delete Icon"] image:iconDelete.png["Delete Icon"]
). Clicking this button will remove the User-Defined property from the Processor. ). Clicking this button will remove the User-Defined property from the Processor.
Some processors also have an Advanced User Interface (UI) built into them. For example, the UpdateAttribute processor has an Advanced UI. To access the Advanced UI, click the `Advanced` button that appears at the bottom of the Configure Processor window. Only processors that have an Advanced UI will have this button.
==== Comments Tab ==== Comments Tab
@ -428,7 +427,7 @@ for all the Processors that are available. Clicking on the desired Processor in
=== Connecting Components === Connecting Components
Once processors have been added to the graph and configured, the next step is to connect them Once processors and other components have been added to the graph and configured, the next step is to connect them
to one another so that NiFi knows what to do with each FlowFile after it has been processed. This is accomplished by creating a to one another so that NiFi knows what to do with each FlowFile after it has been processed. This is accomplished by creating a
Connection between each component. When the user hovers the mouse over the center of a component, a new Connection icon ( Connection between each component. When the user hovers the mouse over the center of a component, a new Connection icon (
image:addConnect.png["Connection Bubble"] image:addConnect.png["Connection Bubble"]
@ -437,11 +436,14 @@ image:addConnect.png["Connection Bubble"]
image:processor-connection-bubble.png["Processor with Connection Bubble"] image:processor-connection-bubble.png["Processor with Connection Bubble"]
The user drags the Connection bubble from one component to another until the second component is highlighted. When the user The user drags the Connection bubble from one component to another until the second component is highlighted. When the user
releases the mouse, a `Create Connection' dialog appears. This dialog consists of two tabs: `Details' and `Settings'. releases the mouse, a `Create Connection' dialog appears. This dialog consists of two tabs: `Details' and `Settings'. They are
discussed in detail below. Note that it is possible to draw a connection so that it loops back on the same processor. This can be
useful if the DFM wants the processor to try to re-process FlowFiles if they go down a failure Relationship. To create this type of looping
connection, simply drag the connection bubble away and then back to the same processor until it is highlighted. Then release the mouse and the same 'Create Connection' dialog appears.
==== Details Tab ==== Details Tab
The Details Tab provides information about the source and destination components, including the component name, the The Details Tab of the 'Create Connection' dialog provides information about the source and destination components, including the component name, the
component type, and the Process Group in which the component lives: component type, and the Process Group in which the component lives:
image::create-connection.png["Create Connection"] image::create-connection.png["Create Connection"]
@ -485,7 +487,14 @@ priority. If two FlowFiles have the same value according to this prioritizer, th
FlowFile to process first, and so on. If a prioritizer is no longer desired, it can then be dragged from the `Selected FlowFile to process first, and so on. If a prioritizer is no longer desired, it can then be dragged from the `Selected
prioritizers' list to the `Available prioritizers' list. prioritizers' list to the `Available prioritizers' list.
The following prioritizers are available:
- *FirstInFirstOutPrioritizer*: Given two FlowFiles, the on that reached the connection first will be processed first.
- *NewestFlowFileFirstPrioritizer*: Given two FlowFiles, the one that is newest in the dataflow will be processed first.
- *OldestFlowFileFirstPrioritizer*: Given two FlowFiles, the on that is oldest in the dataflow will be processed first. This is the default scheme that is used if no prioritizers are selected.
- *PriorityAttributePrioritizer*: Given two FlowFiles that both have a "priority" attribute, the one that has the highest priority value will be prprocessed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. Values for the "priority" attribute may be alphanumeric, where "a" is a higher priority than "z", and "1" is a higher priority than "9", for example.
*Note*: After a connection has been drawn between two components, the connection's configuration may be changed, and the connection may be moved to a new destination; however, the processors on either side of the connection must be stopped before a configuration or destination change may be made.
=== Processor Validation === Processor Validation
@ -501,6 +510,28 @@ to a Stop icon, indicating that the Processor is valid and ready to be started b
image::valid-processor.png["Valid Processor"] image::valid-processor.png["Valid Processor"]
=== Example Dataflow
This section has described the steps required to build a dataflow. Now, to put it all together. The following example dataflow
consists of just two processors: GenerateFlowFile and LogAttribute. These processors are normally used for testing, but they can also be used
to build a quick flow for demonstration purposes and see NiFi in action.
After you drag the GenerateFlowFile and LogAttribute processors to the graph and connect them (using the guidelines provided above), configure them as follows:
* Generate FlowFile
** On the Scheduling tab, set Run schedule to: 5 sec. Note that the GenerateFlowFile processor can create many FlowFiles very quickly; that's why setting the Run schedule is important so that this flow does not overwhelm the system NiFi is running on.
** On the Properties tab, set File Size to: 10 kb
* Log Attribute
** On the Settings tab, under Auto-terminate relationships, select the checkbox next to Success. This will terminate FlowFiles after this processor has successfully processed them.
** Also on the Settings tab, set the Bulletin level to Info. This way, when the dataflow is running, this processor will display the bulletin icon (see <<processor_anatomy>>), and the user may hover over it with the mouse to see the attributes that the processor is logging.
The dataflow should look like the following:
image::simple-flow.png["Simple Flow", width=900]
Now see the following section on how to start and stop the dataflow. When the dataflow is running, be sure to note the statistical information that is displayed on the face of each processor (see <<processor_anatomy>>).
@ -640,7 +671,11 @@ or not compression should be used when transmitting data to or from this Port.
[[navigating]]
== Navigating within a DataFlow
NiFi provides various mechanisms for getting around a dataflow. The <<User_Interface>> section discussed various ways to navigate around
the NiFi graph; however, once a flow exists on the graph, there are additional ways to get from one component to another. The <<User Interface>> section showed that when multiple Process Groups exist in a flow, breadcrumbs appear under the toolbar, providing a way to navigate between them. In addition, to enter a Process Group that is currently visible on the graph, simply double-click it, thereby "drilling down" into it. Connections also provide a way to jump from one location to another within the flow. Right-click on a connection and select "Go to source" or "Go to destination" in order to jump to one end of the connection or another. This can be very useful in large, complex dataflows, where the connection lines may be long and span large areas of the graph. Finally, all components provide the ability to jump forward or backward within the flow. Right-click any component (e.g., a processor, process group, port, etc.) and select either "Upstream connections" or "Downstream connections". A dialog window will open, showing the available upstream or downstream connections that the user may jump to. This can be especially useful when trying to follow a dataflow in a backward direction. It is typically easy to follow the path of a dataflow from start to finish, drilling down into nested process groups; however, it can be more difficult to follow the dataflow in the other direction.
@ -1046,9 +1081,6 @@ will remove the selection.
[[templates]] [[templates]]
== Templates == Templates
DataFlow Managers have the ability to build very large and complex DataFlows using NiFi. This is achieved DataFlow Managers have the ability to build very large and complex DataFlows using NiFi. This is achieved
@ -1240,3 +1272,15 @@ Once "Expand" is selected, the graph is re-drawn to show the children and their
image:expanded-events.png["Expanded Events", width=300] image:expanded-events.png["Expanded Events", width=300]
[[other_management_features]]
Other Management Features
-------------------------
In addition to the Summary Page, Data Provenance Page, Template Management Page, and Bulletin Board Page, there are other tools in the Management Toolbar (See <<User_Interface>>) that are useful to the Dataflow Manager. The Flow Configuration History, which is available by clicking on the clock icon ( image:iconFlowHistory.png["Flow History", width=28] ) in the Management Toolbar, shows all the changes that have been made to the dataflow graph. The history can aid in troubleshooting if a recent change to the dataflow has caused a problem and needs to be fixed. While NiFi does not have an "undo" feature, the DataFlow Manager can make new changes to the dataflow that will fix the problem.
Two other tools in the Management Toolbar are used primarily by Administrators. These are the Flow Settings page ( image:iconSettings.png["Flow Settings", width=28] ) and the Users page ( image:iconUsers.png["Users", width=28] ). The Flow Settings page provides the ability to change the name of the NiFi instance, add comments describing the NiFi instance, set the maximum number of threads that are available to the application, and create a back-up copy of the dataflow(s) currently on the graph. The Users page is used to manage user access, which is described in the Admin Guide.