mirror of https://github.com/apache/nifi.git
NIFI-2511: - Ensuring Process Group bulletins are bubbling up as expected.
This closes #822. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
45c31c8305
commit
fbb705e461
|
@ -203,6 +203,8 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -1606,7 +1608,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
|
|
||||||
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
|
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
|
||||||
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
|
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
|
||||||
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager);
|
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2591,10 +2593,57 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
|
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
|
||||||
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
|
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
|
||||||
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
|
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
|
||||||
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(group.getIdentifier()));
|
final List<BulletinDTO> bulletins = getProcessGroupBulletins(group);
|
||||||
return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
|
return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<BulletinDTO> getProcessGroupBulletins(final ProcessGroup group) {
|
||||||
|
final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));
|
||||||
|
|
||||||
|
for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
|
||||||
|
bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<BulletinDTO> dtos = new ArrayList<>();
|
||||||
|
for (final Bulletin bulletin : bulletins) {
|
||||||
|
if (authorizeBulletin(bulletin)) {
|
||||||
|
dtos.add(dtoFactory.createBulletinDto(bulletin));
|
||||||
|
} else {
|
||||||
|
final BulletinDTO bulletinDTO = new BulletinDTO();
|
||||||
|
bulletinDTO.setTimestamp(bulletin.getTimestamp());
|
||||||
|
bulletinDTO.setId(bulletin.getId());
|
||||||
|
bulletinDTO.setSourceId(bulletin.getSourceId());
|
||||||
|
bulletinDTO.setGroupId(bulletin.getGroupId());
|
||||||
|
dtos.add(bulletinDTO);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort the bulletins
|
||||||
|
Collections.sort(dtos, new Comparator<BulletinDTO>() {
|
||||||
|
@Override
|
||||||
|
public int compare(BulletinDTO o1, BulletinDTO o2) {
|
||||||
|
if (o1 == null && o2 == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (o1 == null) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (o2 == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return -Long.compare(o1.getId(), o2.getId());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// prune the response to only include the max number of bulletins
|
||||||
|
if (dtos.size() > BulletinRepository.MAX_BULLETINS_PER_COMPONENT) {
|
||||||
|
dtos = dtos.subList(0, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dtos;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
|
public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
|
||||||
final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
|
final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
|
||||||
|
@ -2706,7 +2755,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
// read lock on every component being accessed in the dto conversion
|
// read lock on every component being accessed in the dto conversion
|
||||||
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
|
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
|
||||||
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
|
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
|
||||||
return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager), permissions);
|
return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1603,12 +1603,14 @@ public final class DtoFactory {
|
||||||
return createProcessGroupDto(group, false);
|
return createProcessGroupDto(group, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProcessGroupFlowDTO createProcessGroupFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager) {
|
public ProcessGroupFlowDTO createProcessGroupFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager,
|
||||||
|
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {
|
||||||
|
|
||||||
final ProcessGroupFlowDTO dto = new ProcessGroupFlowDTO();
|
final ProcessGroupFlowDTO dto = new ProcessGroupFlowDTO();
|
||||||
dto.setId(group.getIdentifier());
|
dto.setId(group.getIdentifier());
|
||||||
dto.setLastRefreshed(new Date());
|
dto.setLastRefreshed(new Date());
|
||||||
dto.setBreadcrumb(createBreadcrumbEntity(group));
|
dto.setBreadcrumb(createBreadcrumbEntity(group));
|
||||||
dto.setFlow(createFlowDto(group, groupStatus, revisionManager));
|
dto.setFlow(createFlowDto(group, groupStatus, revisionManager, getProcessGroupBulletins));
|
||||||
|
|
||||||
final ProcessGroup parent = group.getParent();
|
final ProcessGroup parent = group.getParent();
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
|
@ -1618,7 +1620,8 @@ public final class DtoFactory {
|
||||||
return dto;
|
return dto;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final FlowSnippetDTO snippet, final RevisionManager revisionManager) {
|
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final FlowSnippetDTO snippet, final RevisionManager revisionManager,
|
||||||
|
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {
|
||||||
if (snippet == null) {
|
if (snippet == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1700,7 +1703,7 @@ public final class DtoFactory {
|
||||||
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
|
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
|
||||||
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
|
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
|
||||||
);
|
);
|
||||||
final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
|
final List<BulletinDTO> bulletins = getProcessGroupBulletins.apply(processGroup);
|
||||||
flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(dto, revision, accessPolicy, status, bulletins));
|
flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(dto, revision, accessPolicy, status, bulletins));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1748,7 +1751,8 @@ public final class DtoFactory {
|
||||||
return statusDTO;
|
return statusDTO;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager) {
|
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager,
|
||||||
|
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {
|
||||||
final FlowDTO dto = new FlowDTO();
|
final FlowDTO dto = new FlowDTO();
|
||||||
|
|
||||||
for (final ProcessorNode procNode : group.getProcessors()) {
|
for (final ProcessorNode procNode : group.getProcessors()) {
|
||||||
|
@ -1791,7 +1795,7 @@ public final class DtoFactory {
|
||||||
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
|
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
|
||||||
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
|
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
|
||||||
);
|
);
|
||||||
final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(childGroup.getIdentifier()));
|
final List<BulletinDTO> bulletins = getProcessGroupBulletins.apply(childGroup);
|
||||||
dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, permissions, status, bulletins));
|
dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, permissions, status, bulletins));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,9 +124,9 @@ public final class EntityFactory {
|
||||||
entity.setDisabledCount(dto.getDisabledCount());
|
entity.setDisabledCount(dto.getDisabledCount());
|
||||||
entity.setActiveRemotePortCount(dto.getActiveRemotePortCount());
|
entity.setActiveRemotePortCount(dto.getActiveRemotePortCount());
|
||||||
entity.setInactiveRemotePortCount(dto.getInactiveRemotePortCount());
|
entity.setInactiveRemotePortCount(dto.getInactiveRemotePortCount());
|
||||||
|
entity.setBulletins(bulletins); // include bulletins as authorized descendant component bulletins should be available
|
||||||
if (permissions != null && permissions.getCanRead()) {
|
if (permissions != null && permissions.getCanRead()) {
|
||||||
entity.setComponent(dto);
|
entity.setComponent(dto);
|
||||||
entity.setBulletins(bulletins);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return entity;
|
return entity;
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
|
||||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.connectable.Connectable;
|
import org.apache.nifi.connectable.Connectable;
|
||||||
|
@ -79,7 +78,6 @@ import org.apache.nifi.provenance.search.SearchTerms;
|
||||||
import org.apache.nifi.provenance.search.SearchableField;
|
import org.apache.nifi.provenance.search.SearchableField;
|
||||||
import org.apache.nifi.registry.VariableRegistry;
|
import org.apache.nifi.registry.VariableRegistry;
|
||||||
import org.apache.nifi.remote.RootGroupPort;
|
import org.apache.nifi.remote.RootGroupPort;
|
||||||
import org.apache.nifi.reporting.BulletinRepository;
|
|
||||||
import org.apache.nifi.reporting.ReportingTask;
|
import org.apache.nifi.reporting.ReportingTask;
|
||||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||||
import org.apache.nifi.search.SearchContext;
|
import org.apache.nifi.search.SearchContext;
|
||||||
|
@ -140,8 +138,6 @@ public class ControllerFacade implements Authorizable {
|
||||||
// nifi components
|
// nifi components
|
||||||
private FlowController flowController;
|
private FlowController flowController;
|
||||||
private FlowService flowService;
|
private FlowService flowService;
|
||||||
private ClusterCoordinator clusterCoordinator;
|
|
||||||
private BulletinRepository bulletinRepository;
|
|
||||||
private Authorizer authorizer;
|
private Authorizer authorizer;
|
||||||
|
|
||||||
// properties
|
// properties
|
||||||
|
@ -1808,14 +1804,6 @@ public class ControllerFacade implements Authorizable {
|
||||||
this.dtoFactory = dtoFactory;
|
this.dtoFactory = dtoFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) {
|
|
||||||
this.clusterCoordinator = clusterCoordinator;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBulletinRepository(BulletinRepository bulletinRepository) {
|
|
||||||
this.bulletinRepository = bulletinRepository;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setVariableRegistry(VariableRegistry variableRegistry) {
|
public void setVariableRegistry(VariableRegistry variableRegistry) {
|
||||||
this.variableRegistry = variableRegistry;
|
this.variableRegistry = variableRegistry;
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,10 +115,8 @@
|
||||||
<property name="properties" ref="nifiProperties"/>
|
<property name="properties" ref="nifiProperties"/>
|
||||||
<property name="flowController" ref="flowController"/>
|
<property name="flowController" ref="flowController"/>
|
||||||
<property name="flowService" ref="flowService"/>
|
<property name="flowService" ref="flowService"/>
|
||||||
<property name="clusterCoordinator" ref="clusterCoordinator" />
|
|
||||||
<property name="authorizer" ref="authorizer"/>
|
<property name="authorizer" ref="authorizer"/>
|
||||||
<property name="dtoFactory" ref="dtoFactory"/>
|
<property name="dtoFactory" ref="dtoFactory"/>
|
||||||
<property name="bulletinRepository" ref="bulletinRepository"/>
|
|
||||||
<property name="variableRegistry" ref="variableRegistry"/>
|
<property name="variableRegistry" ref="variableRegistry"/>
|
||||||
</bean>
|
</bean>
|
||||||
<bean id="authorizableLookup" class="org.apache.nifi.authorization.StandardAuthorizableLookup">
|
<bean id="authorizableLookup" class="org.apache.nifi.authorization.StandardAuthorizableLookup">
|
||||||
|
|
|
@ -482,8 +482,20 @@ nf.CanvasUtils = (function () {
|
||||||
tip.remove();
|
tip.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
// if there are bulletins show them, otherwise hide
|
var hasBulletins = false;
|
||||||
if (!nf.Common.isEmpty(d.bulletins)) {
|
if (!nf.Common.isEmpty(d.bulletins)) {
|
||||||
|
// format the bulletins
|
||||||
|
var bulletins = nf.Common.getFormattedBulletins(d.bulletins);
|
||||||
|
hasBulletins = bulletins.length > 0;
|
||||||
|
|
||||||
|
if (hasBulletins) {
|
||||||
|
// create the unordered list based off the formatted bulletins
|
||||||
|
var list = nf.Common.formatUnorderedList(bulletins);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there are bulletins show them, otherwise hide
|
||||||
|
if (hasBulletins) {
|
||||||
// update the tooltip
|
// update the tooltip
|
||||||
selection.select('text.bulletin-icon')
|
selection.select('text.bulletin-icon')
|
||||||
.each(function () {
|
.each(function () {
|
||||||
|
@ -494,16 +506,7 @@ nf.CanvasUtils = (function () {
|
||||||
})
|
})
|
||||||
.attr('class', 'tooltip nifi-tooltip')
|
.attr('class', 'tooltip nifi-tooltip')
|
||||||
.html(function () {
|
.html(function () {
|
||||||
// format the bulletins
|
return $('<div></div>').append(list).html();
|
||||||
var bulletins = nf.Common.getFormattedBulletins(d.bulletins);
|
|
||||||
|
|
||||||
// create the unordered list based off the formatted bulletins
|
|
||||||
var list = nf.Common.formatUnorderedList(bulletins);
|
|
||||||
if (list === null || list.length === 0) {
|
|
||||||
return '';
|
|
||||||
} else {
|
|
||||||
return $('<div></div>').append(list).html();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// add the tooltip
|
// add the tooltip
|
||||||
|
|
|
@ -1217,25 +1217,27 @@ nf.Common = (function () {
|
||||||
getFormattedBulletins: function (bulletins) {
|
getFormattedBulletins: function (bulletins) {
|
||||||
var formattedBulletins = [];
|
var formattedBulletins = [];
|
||||||
$.each(bulletins, function (j, bulletin) {
|
$.each(bulletins, function (j, bulletin) {
|
||||||
// format the node address
|
if (!nf.Common.isBlank(bulletin.level)) {
|
||||||
var nodeAddress = '';
|
// format the node address
|
||||||
if (nf.Common.isDefinedAndNotNull(bulletin.nodeAddress)) {
|
var nodeAddress = '';
|
||||||
nodeAddress = '- ' + nf.Common.escapeHtml(bulletin.nodeAddress) + ' - ';
|
if (nf.Common.isDefinedAndNotNull(bulletin.nodeAddress)) {
|
||||||
|
nodeAddress = '- ' + nf.Common.escapeHtml(bulletin.nodeAddress) + ' - ';
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the bulletin message (treat as text)
|
||||||
|
var bulletinMessage = $('<pre></pre>').css({
|
||||||
|
'white-space': 'pre-wrap'
|
||||||
|
}).text(bulletin.message);
|
||||||
|
|
||||||
|
// create the bulletin message
|
||||||
|
var formattedBulletin = $('<div>' +
|
||||||
|
nf.Common.escapeHtml(bulletin.timestamp) + ' ' +
|
||||||
|
nodeAddress + ' ' +
|
||||||
|
'<b>' + nf.Common.escapeHtml(bulletin.level) + '</b> ' +
|
||||||
|
'</div>').append(bulletinMessage);
|
||||||
|
|
||||||
|
formattedBulletins.push(formattedBulletin);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the bulletin message (treat as text)
|
|
||||||
var bulletinMessage = $('<pre></pre>').css({
|
|
||||||
'white-space': 'pre-wrap'
|
|
||||||
}).text(bulletin.message);
|
|
||||||
|
|
||||||
// create the bulletin message
|
|
||||||
var formattedBulletin = $('<div>' +
|
|
||||||
nf.Common.escapeHtml(bulletin.timestamp) + ' ' +
|
|
||||||
nodeAddress + ' ' +
|
|
||||||
'<b>' + nf.Common.escapeHtml(bulletin.level) + '</b> ' +
|
|
||||||
'</div>').append(bulletinMessage);
|
|
||||||
|
|
||||||
formattedBulletins.push(formattedBulletin);
|
|
||||||
});
|
});
|
||||||
return formattedBulletins;
|
return formattedBulletins;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue