NIFI-4436:

- Updating buckets permissions based on new model.
- Adding check to ensure that flow name is non null before checking the length.
- Adding versioned flow state to the Process Group tab in the Summary table.
- Fixing issue with navigating to Controller Services from the local changes dialog.
This commit is contained in:
Matt Gilman 2017-12-12 16:39:05 -05:00 committed by Bryan Bende
parent 181d6809c1
commit f48808b1f4
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
15 changed files with 152 additions and 70 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.status;
import org.apache.nifi.registry.flow.VersionedFlowState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -27,6 +29,7 @@ public class ProcessGroupStatus implements Cloneable {
private String id;
private String name;
private VersionedFlowState versionedFlowState;
private Integer inputCount;
private Long inputContentSize;
private Integer outputCount;
@ -66,6 +69,14 @@ public class ProcessGroupStatus implements Cloneable {
this.name = name;
}
public VersionedFlowState getVersionedFlowState() {
return versionedFlowState;
}
public void setVersionedFlowState(VersionedFlowState versionedFlowState) {
this.versionedFlowState = versionedFlowState;
}
public Integer getInputCount() {
return inputCount;
}
@ -399,6 +410,11 @@ public class ProcessGroupStatus implements Cloneable {
target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
// if the versioned flow state to merge is sync failure allow it to take precedence.
if (VersionedFlowState.SYNC_FAILURE.equals(toMerge.getVersionedFlowState())) {
target.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
}
// connection status
// sort by id
final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>();

View File

@ -43,6 +43,8 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
private Collection<PortStatusSnapshotEntity> inputPortStatusSnapshots;
private Collection<PortStatusSnapshotEntity> outputPortStatusSnapshots;
private String versionedFlowState;
private Integer flowFilesIn = 0;
private Long bytesIn = 0L;
private String input;
@ -102,6 +104,17 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
this.name = name;
}
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
public String getVersionedFlowState() {
return versionedFlowState;
}
public void setVersionedFlowState(String versionedFlowState) {
this.versionedFlowState = versionedFlowState;
}
/**
* @return active thread count for this process group
*/
@ -477,6 +490,7 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
final ProcessGroupStatusSnapshotDTO other = new ProcessGroupStatusSnapshotDTO();
other.setId(getId());
other.setName(getName());
other.setVersionedFlowState(getVersionedFlowState());
other.setBytesIn(getBytesIn());
other.setFlowFilesIn(getFlowFilesIn());

View File

@ -30,7 +30,7 @@ public class FlowBreadcrumbEntity extends Entity {
private String id;
private PermissionsDTO permissions;
private String state;
private String versionedFlowState;
private FlowBreadcrumbDTO breadcrumb;
private FlowBreadcrumbEntity parentBreadcrumb;
@ -101,11 +101,11 @@ public class FlowBreadcrumbEntity extends Entity {
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
public String getState() {
return state;
public String getVersionedFlowState() {
return versionedFlowState;
}
public void setState(String state) {
this.state = state;
public void setVersionedFlowState(String versionedFlowState) {
this.versionedFlowState = versionedFlowState;
}
}

View File

@ -40,7 +40,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
private Integer activeRemotePortCount;
private Integer inactiveRemotePortCount;
private String state;
private String versionedFlowState;
private Integer upToDateCount;
private Integer locallyModifiedCount;
@ -204,12 +204,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
public String getState() {
return state;
public String getVersionedFlowState() {
return versionedFlowState;
}
public void setState(String state) {
this.state = state;
public void setVersionedFlowState(String versionedFlowState) {
this.versionedFlowState = versionedFlowState;
}
@ApiModelProperty("The number of up to date versioned process groups in the process group.")

View File

@ -19,6 +19,7 @@ package org.apache.nifi.cluster.manager;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
@ -122,6 +123,11 @@ public class StatusMerger {
target.setName(toMerge.getName());
}
// if the versioned flow state to merge is sync failure allow it to take precedence
if (VersionedFlowState.SYNC_FAILURE.name().equals(toMerge.getVersionedFlowState())) {
target.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE.name());
}
target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());

View File

@ -16,39 +16,6 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -258,6 +225,38 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
@ -3005,6 +3004,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setFlowFilesTransferred(flowFilesTransferred);
status.setBytesTransferred(bytesTransferred);
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null && vci.getStatus() != null && vci.getStatus().getState() != null) {
status.setVersionedFlowState(vci.getStatus().getState());
}
return status;
}

View File

@ -115,6 +115,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.model.authorization.Permissions;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
@ -2417,10 +2418,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
dto.setDescription(bucket.getDescription());
dto.setCreated(bucket.getCreatedTimestamp());
final Set<String> authorizedActions = bucket.getAuthorizedActions();
final Permissions regPermissions = bucket.getPermissions();
final PermissionsDTO permissions = new PermissionsDTO();
permissions.setCanRead(authorizedActions.contains("read"));
permissions.setCanWrite(authorizedActions.contains("write"));
permissions.setCanRead(regPermissions.getCanRead());
permissions.setCanWrite(regPermissions.getCanWrite());
return entityFactory.createBucketEntity(dto, permissions);
})

View File

@ -437,7 +437,7 @@ public class VersionsResource extends ApplicationResource {
if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) {
throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied.");
}
if (versionedFlowDto.getFlowName().length() > 1000) {
if (versionedFlowDto.getFlowName() != null && versionedFlowDto.getFlowName().length() > 1000) {
throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters");
}
if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) {

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.web.api.dto;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -192,6 +190,7 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
@ -955,6 +954,10 @@ public final class DtoFactory {
snapshot.setId(processGroupStatus.getId());
snapshot.setName(processGroupStatus.getName());
if (processGroupStatus.getVersionedFlowState() != null) {
snapshot.setVersionedFlowState(processGroupStatus.getVersionedFlowState().name());
}
snapshot.setFlowFilesQueued(processGroupStatus.getQueuedCount());
snapshot.setBytesQueued(processGroupStatus.getQueuedContentSize());
snapshot.setBytesRead(processGroupStatus.getBytesRead());
@ -2214,7 +2217,7 @@ public final class DtoFactory {
final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
dto.setComponentName(component.getName());
dto.setComponentType(component.getComponentType().name());
dto.setComponentType(component.getComponentType().toString());
if (component instanceof InstantiatedVersionedComponent) {
final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component;

View File

@ -241,7 +241,7 @@ public final class EntityFactory {
entity.setSyncFailureCount(dto.getSyncFailureCount());
if (dto.getVersionControlInformation() != null) {
entity.setState(dto.getVersionControlInformation().getState());
entity.setVersionedFlowState(dto.getVersionControlInformation().getState());
}
entity.setBulletins(bulletins); // include bulletins as authorized descendant component bulletins should be available
@ -513,7 +513,7 @@ public final class EntityFactory {
entity.setId(dto.getId());
if (dto.getVersionControlInformation() != null) {
entity.setState(dto.getVersionControlInformation().getState());
entity.setVersionedFlowState(dto.getVersionControlInformation().getState());
}
if (permissions != null && permissions.getCanRead()) {

View File

@ -103,7 +103,7 @@
* @returns {*}
*/
isTracking: function (breadcrumbEntity) {
return nfCommon.isDefinedAndNotNull(breadcrumbEntity.state);
return nfCommon.isDefinedAndNotNull(breadcrumbEntity.versionedFlowState);
},
/**
@ -113,8 +113,8 @@
* @returns {string}
*/
getVersionControlClass: function (breadcrumbEntity) {
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.state)) {
var vciState = breadcrumbEntity.state;
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.versionedFlowState)) {
var vciState = breadcrumbEntity.versionedFlowState;
if (vciState === 'SYNC_FAILURE') {
return 'breadcrumb-version-control-gray fa fa-question'
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {
@ -137,7 +137,7 @@
* @param breadcrumbEntity
*/
getVersionControlTooltip: function (breadcrumbEntity) {
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.state) && breadcrumbEntity.permissions.canRead) {
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.versionedFlowState) && breadcrumbEntity.permissions.canRead) {
return nfCommon.getVersionControlTooltip(breadcrumbEntity.breadcrumb.versionControlInformation);
} else {
return 'This Process Group is not under version control.'

View File

@ -1602,10 +1602,9 @@
var processGroupId = $('#save-flow-version-process-group-id').text();
saveFlowVersion().done(function (response) {
updateVersionControlInformation(processGroupId, response.versionControlInformation);
// only hide the dialog if the flow version was successfully saved
$(this).modal('hide');
});
$(this).modal('hide');
}
}
}, {

View File

@ -91,7 +91,7 @@
* @param d
*/
var isUnderVersionControl = function (d) {
return nfCommon.isDefinedAndNotNull(d.state);
return nfCommon.isDefinedAndNotNull(d.versionedFlowState);
};
/**
@ -1060,7 +1060,7 @@
'visibility': isUnderVersionControl(processGroupData) ? 'visible' : 'hidden',
'fill': function () {
if (isUnderVersionControl(processGroupData)) {
var vciState = processGroupData.state;
var vciState = processGroupData.versionedFlowState;
if (vciState === 'SYNC_FAILURE') {
return '#666666';
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {
@ -1079,7 +1079,7 @@
})
.text(function () {
if (isUnderVersionControl(processGroupData)) {
var vciState = processGroupData.state;
var vciState = processGroupData.versionedFlowState;
if (vciState === 'SYNC_FAILURE') {
return '\uf128'
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {

View File

@ -310,25 +310,25 @@
if (nfCommon.isDefinedAndNotNull(dataContext.activeThreadCount) && dataContext.activeThreadCount > 0) {
activeThreadCount = '(' + nfCommon.escapeHtml(dataContext.activeThreadCount) + ')';
}
var classes = nfCommon.escapeHtml(value.toLowerCase());
switch (nfCommon.escapeHtml(value.toLowerCase())) {
var classes;
switch (value.toLowerCase()) {
case 'running':
classes += ' fa fa-play running';
classes = 'fa fa-play running';
break;
case 'stopped':
classes += ' fa fa-stop stopped';
classes = 'fa fa-stop stopped';
break;
case 'enabled':
classes += ' fa fa-flash enabled';
classes = 'fa fa-flash enabled';
break;
case 'disabled':
classes += ' icon icon-enable-false disabled';
classes = 'icon icon-enable-false disabled';
break;
case 'invalid':
classes += ' fa fa-warning invalid';
classes = 'fa fa-warning invalid';
break;
default:
classes += '';
classes = '';
}
var formattedValue = '<div layout="row"><div class="' + classes + '"></div>';
return formattedValue + '<div class="status-text" style="margin-top: 4px;">' + nfCommon.escapeHtml(value) + '</div><div style="float: left; margin-left: 4px;">' + nfCommon.escapeHtml(activeThreadCount) + '</div></div>';
@ -1041,6 +1041,37 @@
formatter: nfCommon.genericValueFormatter
};
// define how the column is formatted
var versionStateFormatter = function (row, cell, value, columnDef, dataContext) {
var classes, label;
switch (value) {
case 'UP_TO_DATE':
classes = 'fa fa-check up-to-date';
label = 'Up to date';
break;
case 'LOCALLY_MODIFIED':
classes = 'fa fa-asterisk locally-modified';
label = 'Locally modified';
break;
case 'STALE':
classes = 'fa fa-arrow-circle-up stale';
label = 'Stale';
break;
case 'LOCALLY_MODIFIED_AND_STALE':
classes = 'fa fa-exclamation-circle locally-modified-and-stale';
label = 'Locally modified and stale';
break;
case 'SYNC_FAILURE':
classes = 'fa fa-question sync-failure';
label = 'Sync failure';
break;
default:
classes = '';
label = '';
}
return '<div layout="row"><div class="' + classes + '"></div><div class="status-text" style="margin-top: 4px;">' + label + '</div></div>';
};
// define the column model for the summary table
var processGroupsColumnModel = [
moreDetailsColumn,
@ -1052,6 +1083,14 @@
resizable: true,
formatter: valueFormatter
},
{
id: 'versionedFlowState',
field: 'versionedFlowState',
name: 'Version State',
sortable: true,
resizable: true,
formatter: versionStateFormatter
},
transferredColumn,
inputColumn,
ioColumn,