mirror of https://github.com/apache/nifi.git
NIFI-2204:
- Move bulletins out of the controller status endpoint. NIFI-2238: - Ensuring the controller bulletins are rendered on screen. NIFI-2246: - Ensuring the correct number of bulletins are returned when clustered.
This commit is contained in:
parent
d403254b49
commit
6e5e4cf52b
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
public interface BulletinRepository {
|
||||
|
||||
public static final int MAX_BULLETINS_PER_COMPONENT = 5;
|
||||
public static final int MAX_BULLETINS_FOR_CONTROLLER = 10;
|
||||
|
||||
/**
|
||||
* Adds a Bulletin to the repository.
|
||||
|
|
|
@ -18,10 +18,7 @@ package org.apache.nifi.web.api.dto.status;
|
|||
|
||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.xml.bind.annotation.XmlType;
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
|
||||
/**
|
||||
* The status of this NiFi controller.
|
||||
|
@ -45,10 +42,6 @@ public class ControllerStatusDTO implements Cloneable {
|
|||
private Integer activeRemotePortCount = 0;
|
||||
private Integer inactiveRemotePortCount = 0;
|
||||
|
||||
private List<BulletinDTO> bulletins;
|
||||
private List<BulletinDTO> controllerServiceBulletins;
|
||||
private List<BulletinDTO> reportingTaskBulletins;
|
||||
|
||||
/**
|
||||
* The active thread count.
|
||||
*
|
||||
|
@ -88,42 +81,6 @@ public class ControllerStatusDTO implements Cloneable {
|
|||
this.connectedNodes = connectedNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return System bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("System level bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getBulletins() {
|
||||
return bulletins;
|
||||
}
|
||||
|
||||
public void setBulletins(List<BulletinDTO> bulletins) {
|
||||
this.bulletins = bulletins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Controller service bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("Controller service bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getControllerServiceBulletins() {
|
||||
return controllerServiceBulletins;
|
||||
}
|
||||
|
||||
public void setControllerServiceBulletins(List<BulletinDTO> controllerServiceBulletins) {
|
||||
this.controllerServiceBulletins = controllerServiceBulletins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Reporting task bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("Reporting task bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getReportingTaskBulletins() {
|
||||
return reportingTaskBulletins;
|
||||
}
|
||||
|
||||
public void setReportingTaskBulletins(List<BulletinDTO> reportingTaskBulletins) {
|
||||
this.reportingTaskBulletins = reportingTaskBulletins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of running components in this controller
|
||||
*/
|
||||
|
@ -248,9 +205,6 @@ public class ControllerStatusDTO implements Cloneable {
|
|||
other.setDisabledCount(getDisabledCount());
|
||||
other.setActiveRemotePortCount(getActiveRemotePortCount());
|
||||
other.setInactiveRemotePortCount(getInactiveRemotePortCount());
|
||||
other.setBulletins(getBulletins() == null ? null : new ArrayList<>(getBulletins()));
|
||||
other.setControllerServiceBulletins(getControllerServiceBulletins() == null ? null : new ArrayList<>(getControllerServiceBulletins()));
|
||||
other.setReportingTaskBulletins(getReportingTaskBulletins() == null ? null : new ArrayList<>(getReportingTaskBulletins()));
|
||||
return other;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.api.entity;
|
||||
|
||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ControllerConfigurationDTO.
|
||||
*/
|
||||
@XmlRootElement(name = "controllerConfigurationEntity")
|
||||
public class ControllerBulletinsEntity extends Entity {
|
||||
|
||||
private List<BulletinDTO> bulletins;
|
||||
private List<BulletinDTO> controllerServiceBulletins;
|
||||
private List<BulletinDTO> reportingTaskBulletins;
|
||||
|
||||
/**
|
||||
* @return System bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("System level bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getBulletins() {
|
||||
return bulletins;
|
||||
}
|
||||
|
||||
public void setBulletins(List<BulletinDTO> bulletins) {
|
||||
this.bulletins = bulletins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Controller service bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("Controller service bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getControllerServiceBulletins() {
|
||||
return controllerServiceBulletins;
|
||||
}
|
||||
|
||||
public void setControllerServiceBulletins(List<BulletinDTO> controllerServiceBulletins) {
|
||||
this.controllerServiceBulletins = controllerServiceBulletins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Reporting task bulletins to be reported to the user
|
||||
*/
|
||||
@ApiModelProperty("Reporting task bulletins to be reported to the user.")
|
||||
public List<BulletinDTO> getReportingTaskBulletins() {
|
||||
return reportingTaskBulletins;
|
||||
}
|
||||
|
||||
public void setReportingTaskBulletins(List<BulletinDTO> reportingTaskBulletins) {
|
||||
this.reportingTaskBulletins = reportingTaskBulletins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerBulletinsEntity clone() {
|
||||
final ControllerBulletinsEntity other = new ControllerBulletinsEntity();
|
||||
other.setBulletins(getBulletins() == null ? null : new ArrayList<>(getBulletins()));
|
||||
other.setControllerServiceBulletins(getControllerServiceBulletins() == null ? null : new ArrayList<>(getControllerServiceBulletins()));
|
||||
other.setReportingTaskBulletins(getReportingTaskBulletins() == null ? null : new ArrayList<>(getReportingTaskBulletins()));
|
||||
return other;
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpoin
|
|||
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
|
||||
|
@ -70,6 +71,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
|
|||
private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
|
||||
static {
|
||||
endpointMergers.add(new ControllerStatusEndpointMerger());
|
||||
endpointMergers.add(new ControllerBulletinsEndpointMerger());
|
||||
endpointMergers.add(new GroupStatusEndpointMerger());
|
||||
endpointMergers.add(new ProcessorStatusEndpointMerger());
|
||||
endpointMergers.add(new ConnectionStatusEndpiontMerger());
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.endpoints;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
|
||||
import org.apache.nifi.cluster.manager.BulletinMerger;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.nifi.cluster.manager.BulletinMerger.BULLETIN_COMPARATOR;
|
||||
import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER;
|
||||
import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT;
|
||||
|
||||
public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpoint<ControllerBulletinsEntity> implements EndpointResponseMerger {
|
||||
public static final Pattern CONTROLLER_BULLETINS_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletins");
|
||||
|
||||
@Override
|
||||
public boolean canHandle(URI uri, String method) {
|
||||
return "GET".equalsIgnoreCase(method) && CONTROLLER_BULLETINS_URI_PATTERN.matcher(uri.getPath()).matches();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<ControllerBulletinsEntity> getEntityClass() {
|
||||
return ControllerBulletinsEntity.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mergeResponses(ControllerBulletinsEntity clientEntity, Map<NodeIdentifier, ControllerBulletinsEntity> entityMap,
|
||||
Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
|
||||
|
||||
final Map<NodeIdentifier, List<BulletinDTO>> bulletinDtos = new HashMap<>();
|
||||
final Map<NodeIdentifier, List<BulletinDTO>> controllerServiceBulletinDtos = new HashMap<>();
|
||||
final Map<NodeIdentifier, List<BulletinDTO>> reportingTaskBulletinDtos = new HashMap<>();
|
||||
for (final Map.Entry<NodeIdentifier, ControllerBulletinsEntity> entry : entityMap.entrySet()) {
|
||||
final NodeIdentifier nodeIdentifier = entry.getKey();
|
||||
final ControllerBulletinsEntity entity = entry.getValue();
|
||||
|
||||
// consider the bulletins if present and authorized
|
||||
if (entity.getBulletins() != null) {
|
||||
entity.getBulletins().forEach(bulletin -> {
|
||||
bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
|
||||
});
|
||||
}
|
||||
if (entity.getControllerServiceBulletins() != null) {
|
||||
entity.getControllerServiceBulletins().forEach(bulletin -> {
|
||||
controllerServiceBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
|
||||
});
|
||||
}
|
||||
if (entity.getReportingTaskBulletins() != null) {
|
||||
entity.getReportingTaskBulletins().forEach(bulletin -> {
|
||||
reportingTaskBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
|
||||
});
|
||||
}
|
||||
}
|
||||
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
|
||||
clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos));
|
||||
clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos));
|
||||
|
||||
// sort the bulletins
|
||||
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
||||
Collections.sort(clientEntity.getControllerServiceBulletins(), BULLETIN_COMPARATOR);
|
||||
Collections.sort(clientEntity.getReportingTaskBulletins(), BULLETIN_COMPARATOR);
|
||||
|
||||
// prune the response to only include the max number of bulletins
|
||||
if (clientEntity.getBulletins().size() > MAX_BULLETINS_FOR_CONTROLLER) {
|
||||
clientEntity.setBulletins(clientEntity.getBulletins().subList(0, MAX_BULLETINS_FOR_CONTROLLER));
|
||||
}
|
||||
if (clientEntity.getControllerServiceBulletins().size() > MAX_BULLETINS_PER_COMPONENT) {
|
||||
clientEntity.setControllerServiceBulletins(clientEntity.getControllerServiceBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT));
|
||||
}
|
||||
if (clientEntity.getReportingTaskBulletins().size() > MAX_BULLETINS_PER_COMPONENT) {
|
||||
clientEntity.setReportingTaskBulletins(clientEntity.getReportingTaskBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,18 +17,17 @@
|
|||
|
||||
package org.apache.nifi.cluster.coordination.http.endpoints;
|
||||
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.StatusMerger;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
|
||||
import org.apache.nifi.web.api.entity.ControllerStatusEntity;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.manager.StatusMerger;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
|
||||
import org.apache.nifi.web.api.entity.ControllerStatusEntity;
|
||||
|
||||
public class ControllerStatusEndpointMerger extends AbstractSingleDTOEndpoint<ControllerStatusEntity, ControllerStatusDTO> {
|
||||
public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/status");
|
||||
|
||||
|
@ -54,17 +53,6 @@ public class ControllerStatusEndpointMerger extends AbstractSingleDTOEndpoint<Co
|
|||
final NodeIdentifier nodeId = entry.getKey();
|
||||
final ControllerStatusDTO nodeStatus = entry.getValue();
|
||||
|
||||
final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
|
||||
for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
|
||||
bulletin.setNodeAddress(nodeAddress);
|
||||
}
|
||||
for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) {
|
||||
bulletin.setNodeAddress(nodeAddress);
|
||||
}
|
||||
for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) {
|
||||
bulletin.setNodeAddress(nodeAddress);
|
||||
}
|
||||
|
||||
if (nodeStatus == mergedStatus) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -30,6 +31,23 @@ public final class BulletinMerger {
|
|||
|
||||
private BulletinMerger() {}
|
||||
|
||||
public static Comparator<BulletinDTO> BULLETIN_COMPARATOR = new Comparator<BulletinDTO>() {
|
||||
@Override
|
||||
public int compare(BulletinDTO o1, BulletinDTO o2) {
|
||||
if (o1 == null && o2 == null) {
|
||||
return 0;
|
||||
}
|
||||
if (o1 == null) {
|
||||
return 1;
|
||||
}
|
||||
if (o2 == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return -Long.compare(o1.getId(), o2.getId());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Merges the validation errors.
|
||||
*
|
||||
|
|
|
@ -21,10 +21,14 @@ import org.apache.nifi.web.api.dto.BulletinDTO;
|
|||
import org.apache.nifi.web.api.entity.ComponentEntity;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.nifi.cluster.manager.BulletinMerger.BULLETIN_COMPARATOR;
|
||||
import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT;
|
||||
|
||||
public class ComponentEntityMerger {
|
||||
|
||||
/**
|
||||
|
@ -47,5 +51,13 @@ public class ComponentEntityMerger {
|
|||
}
|
||||
}
|
||||
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
|
||||
|
||||
// sort the results
|
||||
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
||||
|
||||
// prune the response to only include the max number of bulletins
|
||||
if (clientEntity.getBulletins().size() > MAX_BULLETINS_PER_COMPONENT) {
|
||||
clientEntity.setBulletins(clientEntity.getBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,19 +17,9 @@
|
|||
|
||||
package org.apache.nifi.cluster.manager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.controller.status.RunStatus;
|
||||
import org.apache.nifi.controller.status.TransmissionStatus;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
import org.apache.nifi.web.api.dto.CounterDTO;
|
||||
import org.apache.nifi.web.api.dto.CountersDTO;
|
||||
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
|
||||
|
@ -56,6 +46,15 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
|
|||
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
|
||||
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class StatusMerger {
|
||||
public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
|
||||
if (target == null || toMerge == null) {
|
||||
|
@ -66,10 +65,6 @@ public class StatusMerger {
|
|||
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
|
||||
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
|
||||
|
||||
target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins()));
|
||||
target.setControllerServiceBulletins(mergeBulletins(target.getControllerServiceBulletins(), toMerge.getControllerServiceBulletins()));
|
||||
target.setReportingTaskBulletins(mergeBulletins(target.getReportingTaskBulletins(), toMerge.getReportingTaskBulletins()));
|
||||
|
||||
updatePrettyPrintedFields(target);
|
||||
}
|
||||
|
||||
|
@ -78,20 +73,6 @@ public class StatusMerger {
|
|||
target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount()));
|
||||
}
|
||||
|
||||
public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) {
|
||||
final List<BulletinDTO> bulletins = new ArrayList<>();
|
||||
if (targetBulletins != null) {
|
||||
bulletins.addAll(targetBulletins);
|
||||
}
|
||||
|
||||
if (toMerge != null) {
|
||||
bulletins.addAll(toMerge);
|
||||
}
|
||||
|
||||
return bulletins;
|
||||
}
|
||||
|
||||
|
||||
public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
|
||||
merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
|
||||
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
|
|||
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
|
||||
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
|
||||
import org.apache.nifi.web.api.entity.ConnectionEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
|
||||
|
@ -259,6 +260,13 @@ public interface NiFiServiceFacade {
|
|||
*/
|
||||
ControllerConfigurationEntity getControllerConfiguration();
|
||||
|
||||
/**
|
||||
* Gets the controller level bulletins.
|
||||
*
|
||||
* @return Controller level bulletins
|
||||
*/
|
||||
ControllerBulletinsEntity getControllerBulletins();
|
||||
|
||||
/**
|
||||
* Gets the configuration for the flow.
|
||||
*
|
||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.nifi.remote.RootGroupPort;
|
|||
import org.apache.nifi.reporting.Bulletin;
|
||||
import org.apache.nifi.reporting.BulletinQuery;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.reporting.ComponentType;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
|
||||
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
|
||||
|
@ -143,6 +144,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
|
|||
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
|
||||
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
|
||||
import org.apache.nifi.web.api.entity.ConnectionEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
|
||||
|
@ -2179,6 +2181,51 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
return entityFactory.createControllerConfigurationEntity(dto, revision, permissions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerBulletinsEntity getControllerBulletins() {
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();
|
||||
|
||||
final Authorizable controllerAuthorizable = authorizableLookup.getController();
|
||||
if (controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user)) {
|
||||
controllerBulletinsEntity.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
|
||||
}
|
||||
|
||||
// get the controller service bulletins
|
||||
final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
|
||||
final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
|
||||
final List<Bulletin> authorizedControllerServiceBulletins = new ArrayList<>();
|
||||
for (final Bulletin bulletin : allControllerServiceBulletins) {
|
||||
try {
|
||||
final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId());
|
||||
if (controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user)) {
|
||||
authorizedControllerServiceBulletins.add(bulletin);
|
||||
}
|
||||
} catch (final ResourceNotFoundException e) {
|
||||
// controller service missing.. skip
|
||||
}
|
||||
}
|
||||
controllerBulletinsEntity.setControllerServiceBulletins(dtoFactory.createBulletinDtos(authorizedControllerServiceBulletins));
|
||||
|
||||
// get the reporting task bulletins
|
||||
final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
|
||||
final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
|
||||
final List<Bulletin> authorizedReportingTaskBulletins = new ArrayList<>();
|
||||
for (final Bulletin bulletin : allReportingTaskBulletins) {
|
||||
try {
|
||||
final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId());
|
||||
if (reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user)) {
|
||||
authorizedReportingTaskBulletins.add(bulletin);
|
||||
}
|
||||
} catch (final ResourceNotFoundException e) {
|
||||
// reporting task missing.. skip
|
||||
}
|
||||
}
|
||||
controllerBulletinsEntity.setReportingTaskBulletins(dtoFactory.createBulletinDtos(authorizedReportingTaskBulletins));
|
||||
|
||||
return controllerBulletinsEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowConfigurationEntity getFlowConfiguration() {
|
||||
final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval());
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.nifi.web.NiFiServiceFacade;
|
|||
import org.apache.nifi.web.Revision;
|
||||
import org.apache.nifi.web.api.dto.ClusterDTO;
|
||||
import org.apache.nifi.web.api.dto.NodeDTO;
|
||||
import org.apache.nifi.web.api.dto.RevisionDTO;
|
||||
import org.apache.nifi.web.api.entity.ClusterEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
|
@ -48,13 +47,11 @@ import org.apache.nifi.web.api.entity.HistoryEntity;
|
|||
import org.apache.nifi.web.api.entity.NodeEntity;
|
||||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
||||
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
||||
import org.apache.nifi.web.api.request.ClientIdParameter;
|
||||
import org.apache.nifi.web.api.request.DateTimeParameter;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
|
@ -673,9 +670,6 @@ public class ControllerResource extends ApplicationResource {
|
|||
/**
|
||||
* Deletes flow history from the specified end date.
|
||||
*
|
||||
* @param clientId Optional client id. If the client id is not specified, a
|
||||
* new one will be generated. This value (whether specified or generated) is
|
||||
* included in the response.
|
||||
* @param endDate The end date for the purge action.
|
||||
* @return A historyEntity
|
||||
*/
|
||||
|
@ -701,11 +695,6 @@ public class ControllerResource extends ApplicationResource {
|
|||
)
|
||||
public Response deleteHistory(
|
||||
@Context final HttpServletRequest httpServletRequest,
|
||||
@ApiParam(
|
||||
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
||||
required = false
|
||||
)
|
||||
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
|
||||
@ApiParam(
|
||||
value = "Purge actions before this date/time.",
|
||||
required = true
|
||||
|
@ -736,10 +725,6 @@ public class ControllerResource extends ApplicationResource {
|
|||
// purge the actions
|
||||
serviceFacade.deleteActions(endDate.getDateTime());
|
||||
|
||||
// create the revision
|
||||
final RevisionDTO revision = new RevisionDTO();
|
||||
revision.setClientId(clientId.getClientId());
|
||||
|
||||
// create the response entity
|
||||
final HistoryEntity entity = new HistoryEntity();
|
||||
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.nifi.web.api.entity.BulletinBoardEntity;
|
|||
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
|
||||
import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
|
||||
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
|
||||
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
|
||||
|
@ -810,6 +811,46 @@ public class FlowResource extends ApplicationResource {
|
|||
return clusterContext(generateOkResponse(entity)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the controller level bulletins.
|
||||
*
|
||||
* @return A controllerBulletinsEntity.
|
||||
*/
|
||||
@GET
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Path("controller/bulletins")
|
||||
// TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
|
||||
@ApiOperation(
|
||||
value = "Retrieves Controller level bulletins",
|
||||
response = ControllerBulletinsEntity.class,
|
||||
authorizations = {
|
||||
@Authorization(value = "Read Only", type = "ROLE_MONITOR"),
|
||||
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
|
||||
@Authorization(value = "Administrator", type = "ROLE_ADMIN")
|
||||
}
|
||||
)
|
||||
@ApiResponses(
|
||||
value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response getBulletins() {
|
||||
|
||||
authorizeFlow();
|
||||
|
||||
if (isReplicateRequest()) {
|
||||
return replicate(HttpMethod.GET);
|
||||
}
|
||||
|
||||
final ControllerBulletinsEntity entity = serviceFacade.getControllerBulletins();
|
||||
return clusterContext(generateOkResponse(entity)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the banners for this NiFi.
|
||||
*
|
||||
|
|
|
@ -80,9 +80,7 @@ import org.apache.nifi.provenance.search.SearchTerm;
|
|||
import org.apache.nifi.provenance.search.SearchTerms;
|
||||
import org.apache.nifi.provenance.search.SearchableField;
|
||||
import org.apache.nifi.remote.RootGroupPort;
|
||||
import org.apache.nifi.reporting.BulletinQuery;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.reporting.ComponentType;
|
||||
import org.apache.nifi.reporting.ReportingTask;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.search.SearchContext;
|
||||
|
@ -520,16 +518,6 @@ public class ControllerFacade implements Authorizable {
|
|||
controllerStatus.setConnectedNodes(connectedNodeCount + " / " + totalNodeCount);
|
||||
}
|
||||
|
||||
controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
|
||||
|
||||
// get the controller service bulletins
|
||||
final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
|
||||
controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
|
||||
|
||||
// get the reporting task bulletins
|
||||
final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
|
||||
controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
|
||||
|
||||
final ProcessGroupCounts counts = rootGroup.getCounts();
|
||||
controllerStatus.setRunningCount(counts.getRunningCount());
|
||||
controllerStatus.setStoppedCount(counts.getStoppedCount());
|
||||
|
|
|
@ -257,20 +257,20 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
|
|||
/**
|
||||
* Update the bulletins.
|
||||
*
|
||||
* @param status The controller status returned from the `../nifi-api/flow/status` endpoint.
|
||||
* @param response The controller bulletins returned from the `../nifi-api/controller/bulletins` endpoint.
|
||||
*/
|
||||
update: function (status) {
|
||||
update: function (response) {
|
||||
|
||||
// icon for system bulletins
|
||||
var bulletinIcon = $('#bulletin-button');
|
||||
var currentBulletins = bulletinIcon.data('bulletins');
|
||||
|
||||
// update the bulletins if necessary
|
||||
if (nf.Common.doBulletinsDiffer(currentBulletins, status.bulletins)) {
|
||||
bulletinIcon.data('bulletins', status.bulletins);
|
||||
if (nf.Common.doBulletinsDiffer(currentBulletins, response.bulletins)) {
|
||||
bulletinIcon.data('bulletins', response.bulletins);
|
||||
|
||||
// get the formatted the bulletins
|
||||
var bulletins = nf.Common.getFormattedBulletins(status.bulletins);
|
||||
var bulletins = nf.Common.getFormattedBulletins(response.bulletins);
|
||||
|
||||
// bulletins for this processor are now gone
|
||||
if (bulletins.length === 0) {
|
||||
|
@ -286,22 +286,24 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
|
|||
} else {
|
||||
// no bulletins before, show icon and tips
|
||||
bulletinIcon.addClass('has-bulletins').qtip($.extend({},
|
||||
nf.CanvasUtils.config.systemTooltipConfig, {
|
||||
position: {
|
||||
nf.CanvasUtils.config.systemTooltipConfig,
|
||||
{
|
||||
content: newBulletins,
|
||||
at: 'bottom left',
|
||||
my: 'top right',
|
||||
adjust: {
|
||||
x: 4
|
||||
position: {
|
||||
at: 'bottom left',
|
||||
my: 'top right',
|
||||
adjust: {
|
||||
x: 4
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update controller service and reporting task bulletins
|
||||
nf.Settings.setBulletins(status.controllerServiceBulletins, status.reportingTaskBulletins);
|
||||
nf.Settings.setBulletins(response.controllerServiceBulletins, response.reportingTaskBulletins);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -381,7 +383,15 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) {
|
|||
this.connectedNodesCount =
|
||||
nf.Common.isDefinedAndNotNull(status.connectedNodes) ? $sanitize(status.connectedNodes) : '-';
|
||||
|
||||
this.bulletins.update(status);
|
||||
},
|
||||
|
||||
/**
|
||||
* Updates the controller level bulletins
|
||||
*
|
||||
* @param response
|
||||
*/
|
||||
updateBulletins: function (response) {
|
||||
this.bulletins.update(response);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@ nf.Canvas = (function () {
|
|||
urls: {
|
||||
api: '../nifi-api',
|
||||
currentUser: '../nifi-api/flow/current-user',
|
||||
controllerBulletins: '../nifi-api/flow/controller/bulletins',
|
||||
kerberos: '../nifi-api/access/kerberos',
|
||||
revision: '../nifi-api/flow/revision',
|
||||
banners: '../nifi-api/flow/banners',
|
||||
|
@ -721,14 +722,26 @@ nf.Canvas = (function () {
|
|||
var processGroupXhr = reloadProcessGroup(nf.Canvas.getGroupId(), options);
|
||||
var statusXhr = nf.ng.Bridge.injector.get('flowStatusCtrl').reloadFlowStatus();
|
||||
var currentUserXhr = loadCurrentUser();
|
||||
$.when(processGroupXhr, statusXhr, currentUserXhr).done(function (processGroupResult) {
|
||||
var controllerBulletins = $.ajax({
|
||||
type: 'GET',
|
||||
url: config.urls.controllerBulletins,
|
||||
dataType: 'json'
|
||||
}).done(function (response) {
|
||||
nf.ng.Bridge.injector.get('flowStatusCtrl').updateBulletins(response);
|
||||
deferred.resolve();
|
||||
}).fail(function (xhr, status, error) {
|
||||
deferred.reject(xhr, status, error);
|
||||
});
|
||||
|
||||
// wait for all requests to complete
|
||||
$.when(processGroupXhr, statusXhr, currentUserXhr, controllerBulletins).done(function (processGroupResult) {
|
||||
// inform Angular app values have changed
|
||||
nf.ng.Bridge.digest();
|
||||
|
||||
// resolve the deferred
|
||||
deferred.resolve(processGroupResult);
|
||||
}).fail(function () {
|
||||
deferred.reject();
|
||||
}).fail(function (xhr, status, error) {
|
||||
deferred.reject(xhr, status, error);
|
||||
});
|
||||
}).promise();
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue