This commit is contained in:
joewitt 2015-04-27 14:26:14 -04:00
parent 10860944d1
commit 21209b2341
50 changed files with 318 additions and 589 deletions

View File

@ -20,8 +20,7 @@ import java.util.Date;
import org.apache.commons.lang3.StringUtils;
/**
* Events describe the occurrence of something noteworthy. They record the
* event's source, a timestamp, a description, and a category.
* Events describe the occurrence of something noteworthy. They record the event's source, a timestamp, a description, and a category.
*
* @author unattributed
*
@ -45,8 +44,7 @@ public class Event {
private final String message;
/**
* Creates an event with the current time as the timestamp and a category of
* "INFO".
* Creates an event with the current time as the timestamp and a category of "INFO".
*
* @param source the source
* @param message the description

View File

@ -19,9 +19,8 @@ package org.apache.nifi.cluster.event;
import java.util.List;
/**
* Manages an ordered list of events. The event history size dictates the total
* number of events to manage for a given source at a given time. When the size
* is exceeded, the oldest event for that source is evicted.
* Manages an ordered list of events. The event history size dictates the total number of events to manage for a given source at a given time. When the size is exceeded, the oldest event for that
* source is evicted.
*
* @author unattributed
*/
@ -35,8 +34,7 @@ public interface EventManager {
void addEvent(Event event);
/**
* Returns a list of events for a given source sorted by the event's
* timestamp where the most recent event is first in the list.
* Returns a list of events for a given source sorted by the event's timestamp where the most recent event is first in the list.
*
* @param eventSource the source
*

View File

@ -36,8 +36,7 @@ import org.apache.nifi.cluster.event.EventManager;
public class EventManagerImpl implements EventManager {
/**
* associates the source ID with an ordered queue of events, ordered by most
* recent event
* associates the source ID with an ordered queue of events, ordered by most recent event
*/
private final Map<String, Queue<Event>> eventsMap = new HashMap<>();
@ -49,8 +48,7 @@ public class EventManagerImpl implements EventManager {
/**
* Creates an instance.
*
* @param eventHistorySize the number of events to manage for a given
* source. Value must be positive.
* @param eventHistorySize the number of events to manage for a given source. Value must be positive.
*/
public EventManagerImpl(final int eventHistorySize) {
if (eventHistorySize <= 0) {

View File

@ -17,14 +17,12 @@
package org.apache.nifi.cluster.firewall;
/**
* Defines the interface for restricting external client connections to a set of
* hosts or IPs.
* Defines the interface for restricting external client connections to a set of hosts or IPs.
*/
public interface ClusterNodeFirewall {
/**
* Returns true if the given host or IP is permissible through the firewall;
* false otherwise.
* Returns true if the given host or IP is permissible through the firewall; false otherwise.
*
* If an IP is given, then it must be formatted in dotted decimal notation.
*

View File

@ -32,11 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A file-based implementation of the ClusterFirewall interface. The class is
* configured with a file. If the file is empty, then everything is permissible.
* Otherwise, the file should contain hostnames or IPs formatted as dotted
* decimals with an optional CIDR suffix. Each entry must be separated by a
* newline. An example configuration is given below:
* A file-based implementation of the ClusterFirewall interface. The class is configured with a file. If the file is empty, then everything is permissible. Otherwise, the file should contain hostnames
* or IPs formatted as dotted decimals with an optional CIDR suffix. Each entry must be separated by a newline. An example configuration is given below:
*
* <code>
* # hash character is a comment delimiter
@ -46,12 +43,9 @@ import org.slf4j.LoggerFactory;
* 9.10.11.12/13 # a smaller range of CIDR IPs
* </code>
*
* This class allows for synchronization with an optionally configured restore
* directory. If configured, then at startup, if the either the config file or
* the restore directory's copy is missing, then the configuration file will be
* copied to the appropriate location. If both restore directory contains a copy
* that is different in content to configuration file, then an exception is
* thrown at construction time.
* This class allows for synchronization with an optionally configured restore directory. If configured, then at startup, if the either the config file or the restore directory's copy is missing, then
* the configuration file will be copied to the appropriate location. If both restore directory contains a copy that is different in content to configuration file, then an exception is thrown at
* construction time.
*/
public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall {

View File

@ -42,8 +42,7 @@ public interface DataFlowDao {
void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
/**
* Sets the state of the dataflow. If the dataflow does not exist, then an
* exception is thrown.
* Sets the state of the dataflow. If the dataflow does not exist, then an exception is thrown.
*
* @param flowState the state of the dataflow
*

View File

@ -21,13 +21,9 @@ import java.util.Set;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
* A service for managing the cluster's flow. The service will attempt to keep
* the cluster's dataflow current while respecting the value of the configured
* retrieval delay.
* A service for managing the cluster's flow. The service will attempt to keep the cluster's dataflow current while respecting the value of the configured retrieval delay.
*
* The eligible retrieval time is reset with the configured delay every time the
* flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then
* the flow will not be retrieved.
* The eligible retrieval time is reset with the configured delay every time the flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow will not be retrieved.
*
* Clients must call start() and stop() to initialize and stop the instance.
*
@ -35,8 +31,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
public interface DataFlowManagementService {
/**
* Starts the instance. Start may only be called if the instance is not
* running.
* Starts the instance. Start may only be called if the instance is not running.
*/
void start();
@ -67,8 +62,7 @@ public interface DataFlowManagementService {
void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
/**
* Updates the dataflow with the given serialized form of the Controller
* Services that are to exist on the NCM.
* Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
*
* @param serializedControllerServices services
* @throws DaoException ex
@ -76,8 +70,7 @@ public interface DataFlowManagementService {
void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
/**
* Updates the dataflow with the given serialized form of Reporting Tasks
* that are to exist on the NCM.
* Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
*
* @param serializedReportingTasks tasks
* @throws DaoException ex
@ -111,11 +104,9 @@ public interface DataFlowManagementService {
void setNodeIds(Set<NodeIdentifier> nodeIds);
/**
* Returns the set of node identifiers the service is using to retrieve the
* flow.
* Returns the set of node identifiers the service is using to retrieve the flow.
*
* @return the set of node identifiers the service is using to retrieve the
* flow.
* @return the set of node identifiers the service is using to retrieve the flow.
*/
Set<NodeIdentifier> getNodeIds();

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.flow;
/**
* Represents the exceptional case when a caller is requesting the current flow,
* but a current flow is not available.
* Represents the exceptional case when a caller is requesting the current flow, but a current flow is not available.
*
* @author unattributed
*/

View File

@ -67,34 +67,22 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
/**
* Implements the FlowDao interface. The implementation tracks the state of the
* dataflow by annotating the filename of the flow state file. Specifically, the
* implementation correlates PersistedFlowState states to filename extensions.
* The correlation is as follows:
* Implements the FlowDao interface. The implementation tracks the state of the dataflow by annotating the filename of the flow state file. Specifically, the implementation correlates
* PersistedFlowState states to filename extensions. The correlation is as follows:
* <ul>
* <li> CURRENT maps to flow.xml </li>
* <li> STALE maps to flow.xml.stale </li>
* <li> UNKNOWN maps to flow.xml.unknown </li>
* </ul>
* Whenever the flow state changes, the flow state file's name is updated to
* denote its state.
* Whenever the flow state changes, the flow state file's name is updated to denote its state.
*
* The implementation also provides for a restore directory that may be
* configured for higher availability. At instance creation, if the primary or
* restore directories have multiple flow state files, an exception is thrown.
* If the primary directory has a current flow state file, but the restore
* directory does not, then the primary flow state file is copied to the restore
* directory. If the restore directory has a current flow state file, but the
* primary directory does not, then the restore flow state file is copied to the
* primary directory. If both the primary and restore directories have a current
* flow state file and the files are different, then an exception is thrown.
* The implementation also provides for a restore directory that may be configured for higher availability. At instance creation, if the primary or restore directories have multiple flow state files,
* an exception is thrown. If the primary directory has a current flow state file, but the restore directory does not, then the primary flow state file is copied to the restore directory. If the
* restore directory has a current flow state file, but the primary directory does not, then the restore flow state file is copied to the primary directory. If both the primary and restore directories
* have a current flow state file and the files are different, then an exception is thrown.
*
* When the flow state file is saved, it is always saved first to the restore
* directory followed by a save to the primary directory. When the flow state
* file is loaded, a check is made to verify that the primary and restore flow
* state files are both current. If either is not current, then an exception is
* thrown. The primary flow state file is always read when the load method is
* called.
* When the flow state file is saved, it is always saved first to the restore directory followed by a save to the primary directory. When the flow state file is loaded, a check is made to verify that
* the primary and restore flow state files are both current. If either is not current, then an exception is thrown. The primary flow state file is always read when the load method is called.
*
* @author unattributed
*/

View File

@ -45,19 +45,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements FlowManagementService interface. The service tries to keep the
* cluster's flow current with regards to the available nodes.
* Implements FlowManagementService interface. The service tries to keep the cluster's flow current with regards to the available nodes.
*
* The instance may be configured with a retrieval delay, which will reduce the
* number of retrievals performed by the service at the expense of increasing
* the chances that the service will not be able to provide a current flow to
* the caller.
* The instance may be configured with a retrieval delay, which will reduce the number of retrievals performed by the service at the expense of increasing the chances that the service will not be able
* to provide a current flow to the caller.
*
* By default, the service will try to update the flow as quickly as possible.
* Configuring a delay enables a less aggressive retrieval strategy.
* Specifically, the eligible retrieval time is reset every time the flow state
* is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
* will not be retrieved.
* By default, the service will try to update the flow as quickly as possible. Configuring a delay enables a less aggressive retrieval strategy. Specifically, the eligible retrieval time is reset
* every time the flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow will not be retrieved.
*
* @author unattributed
*/
@ -298,8 +292,7 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
}
/**
* A timer task for issuing FlowRequestMessage messages to nodes to retrieve
* an updated flow.
* A timer task for issuing FlowRequestMessage messages to nodes to retrieve an updated flow.
*/
private class FlowRetrieverTimerTask extends TimerTask {

View File

@ -39,24 +39,15 @@ import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.reporting.BulletinRepository;
/**
* Defines the interface for a ClusterManager. The cluster manager is a
* threadsafe centralized manager for a cluster. Members of a cluster are nodes.
* A member becomes a node by issuing a connection request to the manager. The
* manager maintains the set of nodes. Nodes may be disconnected, reconnected,
* and deleted.
* Defines the interface for a ClusterManager. The cluster manager is a threadsafe centralized manager for a cluster. Members of a cluster are nodes. A member becomes a node by issuing a connection
* request to the manager. The manager maintains the set of nodes. Nodes may be disconnected, reconnected, and deleted.
*
* Nodes are responsible for sending heartbeats to the manager to indicate their
* liveliness. A manager may disconnect a node if it does not receive a
* heartbeat within a configurable time period. A cluster manager instance may
* be configured with how often to monitor received heartbeats
* (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may
* elapse between node heartbeats before disconnecting the node
* (getMaxHeartbeatGapSeconds()).
* Nodes are responsible for sending heartbeats to the manager to indicate their liveliness. A manager may disconnect a node if it does not receive a heartbeat within a configurable time period. A
* cluster manager instance may be configured with how often to monitor received heartbeats (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may elapse between node heartbeats
* before disconnecting the node (getMaxHeartbeatGapSeconds()).
*
* Since only a single node may execute isolated processors, the cluster manager
* maintains the notion of a primary node. The primary node is chosen at cluster
* startup and retains the role until a user requests a different node to be the
* primary node.
* Since only a single node may execute isolated processors, the cluster manager maintains the notion of a primary node. The primary node is chosen at cluster startup and retains the role until a user
* requests a different node to be the primary node.
*
* @author unattributed
*/
@ -78,8 +69,7 @@ public interface ClusterManager extends NodeInformant {
/**
* @param nodeId node identifier
* @return returns the node with the given identifier or null if node does
* not exist
* @return returns the node with the given identifier or null if node does not exist
*/
Node getNode(String nodeId);
@ -90,17 +80,13 @@ public interface ClusterManager extends NodeInformant {
Set<NodeIdentifier> getNodeIds(Status... statuses);
/**
* Deletes the node with the given node identifier. If the given node is the
* primary node, then a subsequent request may be made to the manager to set
* a new primary node.
* Deletes the node with the given node identifier. If the given node is the primary node, then a subsequent request may be made to the manager to set a new primary node.
*
* @param nodeId the node identifier
* @param userDn the Distinguished Name of the user requesting the node be
* deleted from the cluster
* @param userDn the Distinguished Name of the user requesting the node be deleted from the cluster
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeDeletionException if the node is not in a disconnected
* state
* @throws IllegalNodeDeletionException if the node is not in a disconnected state
*/
void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException;
@ -114,14 +100,11 @@ public interface ClusterManager extends NodeInformant {
ConnectionResponse requestConnection(ConnectionRequest request);
/**
* Services reconnection requests for a given node. If the node indicates
* reconnection failure, then the node will be set to disconnected.
* Otherwise, a reconnection request will be sent to the node, initiating
* the connection handshake.
* Services reconnection requests for a given node. If the node indicates reconnection failure, then the node will be set to disconnected. Otherwise, a reconnection request will be sent to the
* node, initiating the connection handshake.
*
* @param nodeId a node identifier
* @param userDn the Distinguished Name of the user requesting the
* reconnection
* @param userDn the Distinguished Name of the user requesting the reconnection
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeReconnectionException if the node is not disconnected
@ -132,13 +115,10 @@ public interface ClusterManager extends NodeInformant {
* Requests the node with the given identifier be disconnected.
*
* @param nodeId the node identifier
* @param userDn the Distinguished Name of the user requesting the
* disconnection
* @param userDn the Distinguished Name of the user requesting the disconnection
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeDisconnectionException if the node cannot be
* disconnected due to the cluster's state (e.g., node is last connected
* node or node is primary)
* @throws IllegalNodeDisconnectionException if the node cannot be disconnected due to the cluster's state (e.g., node is last connected node or node is primary)
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeDisconnectionException if the node is not disconnected
* @throws NodeDisconnectionException if the disconnection failed
@ -146,50 +126,37 @@ public interface ClusterManager extends NodeInformant {
void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException;
/**
* @return the time in seconds to wait between successive executions of
* heartbeat monitoring
* @return the time in seconds to wait between successive executions of heartbeat monitoring
*/
int getHeartbeatMonitoringIntervalSeconds();
/**
* @return the maximum time in seconds that is allowed between successive
* heartbeats of a node before disconnecting the node
* @return the maximum time in seconds that is allowed between successive heartbeats of a node before disconnecting the node
*/
int getMaxHeartbeatGapSeconds();
/**
* Returns a list of node events for the node with the given identifier. The
* events will be returned in order of most recent to least recent according
* to the creation date of the event.
* Returns a list of node events for the node with the given identifier. The events will be returned in order of most recent to least recent according to the creation date of the event.
*
* @param nodeId the node identifier
*
* @return the list of events or an empty list if no node exists with the
* given identifier
* @return the list of events or an empty list if no node exists with the given identifier
*/
List<Event> getNodeEvents(final String nodeId);
/**
* Revokes the primary role from the current primary node and assigns the
* primary role to given given node ID.
* Revokes the primary role from the current primary node and assigns the primary role to given given node ID.
*
* If role revocation fails, then the current primary node is set to
* disconnected while retaining the primary role and no role assignment is
* performed.
* If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed.
*
* If role assignment fails, then the given node is set to disconnected and
* is given the primary role.
* If role assignment fails, then the given node is set to disconnected and is given the primary role.
*
* @param nodeId the node identifier
* @param userDn the Distinguished Name of the user requesting that the
* Primary Node be assigned
* @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned
*
* @throws UnknownNodeException if the node with the given identifier does
* not exist
* @throws IneligiblePrimaryNodeException if the node with the given
* identifier is not eligible to be the primary node
* @throws PrimaryRoleAssignmentException if the cluster was unable to
* change the primary role to the requested node
* @throws UnknownNodeException if the node with the given identifier does not exist
* @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node
* @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node
*/
void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
@ -204,20 +171,13 @@ public interface ClusterManager extends NodeInformant {
BulletinRepository getBulletinRepository();
/**
* Returns a {@link ProcessGroupStatus} that represents the status of all
* nodes with the given {@link Status}es for the given ProcessGroup id, or
* null if no nodes exist with the given statuses
*
* @param groupId
* @return
* @param groupId groupId
* @return a {@link ProcessGroupStatus} that represents the status of all nodes with the given {@link Status}es for the given ProcessGroup id, or null if no nodes exist with the given statuses
*/
ProcessGroupStatus getProcessGroupStatus(String groupId);
/**
* Returns a merged representation of the System Diagnostics for all nodes
* in the cluster
*
* @return
* @return a merged representation of the System Diagnostics for all nodes in the cluster
*/
SystemDiagnostics getSystemDiagnostics();
}

View File

@ -30,73 +30,53 @@ import java.util.Map;
import java.util.Set;
/**
* Extends the ClusterManager interface to define how requests issued to the
* cluster manager are federated to the nodes. Specifically, the HTTP protocol
* is used for communicating requests to the cluster manager and to the nodes.
* Extends the ClusterManager interface to define how requests issued to the cluster manager are federated to the nodes. Specifically, the HTTP protocol is used for communicating requests to the
* cluster manager and to the nodes.
*
* @author unattributed
*/
public interface HttpClusterManager extends ClusterManager {
/**
* Federates the HTTP request to all connected nodes in the cluster. The
* given URI's host and port will not be used and instead will be adjusted
* for each node's host and port. The node URIs are guaranteed to be
* constructed before issuing any requests, so if a UriConstructionException
* is thrown, then it is guaranteed that no request was issued.
* Federates the HTTP request to all connected nodes in the cluster. The given URI's host and port will not be used and instead will be adjusted for each node's host and port. The node URIs are
* guaranteed to be constructed before issuing any requests, so if a UriConstructionException is thrown, then it is guaranteed that no request was issued.
*
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param uri the base request URI (up to, but not including, the query string)
* @param parameters the request parameters
* @param headers the request headers
*
* @return the client response
*
* @throws NoConnectedNodesException if no nodes are connected as results of
* the request
* @throws NoConnectedNodesException if no nodes are connected as results of the request
* @throws NoResponseFromNodesException if no response could be obtained
* @throws UriConstructionException if there was an issue constructing the
* URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST,
* DELETE and a the cluster is in safe mode
* @throws UriConstructionException if there was an issue constructing the URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST, DELETE and a the cluster is in safe mode
*/
NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
/**
* Federates the HTTP request to the nodes specified. The given URI's host
* and port will not be used and instead will be adjusted for each node's
* host and port. The node URIs are guaranteed to be constructed before
* issuing any requests, so if a UriConstructionException is thrown, then it
* is guaranteed that no request was issued.
* Federates the HTTP request to the nodes specified. The given URI's host and port will not be used and instead will be adjusted for each node's host and port. The node URIs are guaranteed to be
* constructed before issuing any requests, so if a UriConstructionException is thrown, then it is guaranteed that no request was issued.
*
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param uri the base request URI (up to, but not including, the query string)
* @param parameters the request parameters
* @param headers the request headers
* @param nodeIdentifiers the NodeIdentifier for each node that the request
* should be replaced to
* @param nodeIdentifiers the NodeIdentifier for each node that the request should be replaced to
*
* @return the client response
*
* @throws NoConnectedNodesException if no nodes are connected as results of
* the request
* @throws NoConnectedNodesException if no nodes are connected as results of the request
* @throws NoResponseFromNodesException if no response could be obtained
* @throws UriConstructionException if there was an issue constructing the
* URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST,
* DELETE and a the cluster is in safe mode
* @throws UriConstructionException if there was an issue constructing the URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST, DELETE and a the cluster is in safe mode
*/
NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers,
Set<NodeIdentifier> nodeIdentifiers)
@ -104,64 +84,45 @@ public interface HttpClusterManager extends ClusterManager {
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
/**
* Federates the HTTP request to all connected nodes in the cluster. The
* given URI's host and port will not be used and instead will be adjusted
* for each node's host and port. The node URIs are guaranteed to be
* constructed before issuing any requests, so if a UriConstructionException
* is thrown, then it is guaranteed that no request was issued.
* Federates the HTTP request to all connected nodes in the cluster. The given URI's host and port will not be used and instead will be adjusted for each node's host and port. The node URIs are
* guaranteed to be constructed before issuing any requests, so if a UriConstructionException is thrown, then it is guaranteed that no request was issued.
*
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param uri the base request URI (up to, but not including, the query string)
* @param entity the HTTP request entity
* @param headers the request headers
*
* @return the client response
*
* @throws NoConnectedNodesException if no nodes are connected as results of
* the request
* @throws NoConnectedNodesException if no nodes are connected as results of the request
* @throws NoResponseFromNodesException if no response could be obtained
* @throws UriConstructionException if there was an issue constructing the
* URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST,
* DELETE and a the cluster is in safe mode
* @throws UriConstructionException if there was an issue constructing the URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST, DELETE and a the cluster is in safe mode
*/
NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
/**
* Federates the HTTP request to the nodes specified. The given URI's host
* and port will not be used and instead will be adjusted for each node's
* host and port. The node URIs are guaranteed to be constructed before
* issuing any requests, so if a UriConstructionException is thrown, then it
* is guaranteed that no request was issued.
* Federates the HTTP request to the nodes specified. The given URI's host and port will not be used and instead will be adjusted for each node's host and port. The node URIs are guaranteed to be
* constructed before issuing any requests, so if a UriConstructionException is thrown, then it is guaranteed that no request was issued.
*
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param uri the base request URI (up to, but not including, the query string)
* @param entity the HTTP request entity
* @param headers the request headers
* @param nodeIdentifiers the NodeIdentifier for each node that the request
* should be replaced to
* @param nodeIdentifiers the NodeIdentifier for each node that the request should be replaced to
*
* @return the client response
*
* @throws NoConnectedNodesException if no nodes are connected as results of
* the request
* @throws NoConnectedNodesException if no nodes are connected as results of the request
* @throws NoResponseFromNodesException if no response could be obtained
* @throws UriConstructionException if there was an issue constructing the
* URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT,
* POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST,
* DELETE and a the cluster is in safe mode
* @throws UriConstructionException if there was an issue constructing the URIs tailored for each individual node
* @throws ConnectingNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is connecting to the cluster
* @throws DisconnectedNodeMutableRequestException if the request was a PUT, POST, DELETE and a node is disconnected from the cluster
* @throws SafeModeMutableRequestException if the request was a PUT, POST, DELETE and a the cluster is in safe mode
*/
NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers, Set<NodeIdentifier> nodeIdentifiers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,

View File

@ -24,25 +24,21 @@ import java.util.Set;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
* A service for managing the replication of requests to nodes. It is up to the
* implementing class to decide if requests are sent concurrently or serially.
* A service for managing the replication of requests to nodes. It is up to the implementing class to decide if requests are sent concurrently or serially.
*
* Clients must call start() and stop() to initialize and shutdown the instance.
* The instance must be started before issuing any replication requests.
* Clients must call start() and stop() to initialize and shutdown the instance. The instance must be started before issuing any replication requests.
*
* @author unattributed
*/
public interface HttpRequestReplicator {
/**
* Starts the instance for replicating requests. Start may only be called if
* the instance is not running.
* Starts the instance for replicating requests. Start may only be called if the instance is not running.
*/
void start();
/**
* Stops the instance from replicating requests. Stop may only be called if
* the instance is running.
* Stops the instance from replicating requests. Stop may only be called if the instance is running.
*/
void stop();
@ -52,47 +48,36 @@ public interface HttpRequestReplicator {
boolean isRunning();
/**
* Requests are sent to each node in the cluster. If the request results in
* an exception, then the NodeResourceResponse will contain the exception.
* Requests are sent to each node in the cluster. If the request results in an exception, then the NodeResourceResponse will contain the exception.
*
* HTTP DELETE and OPTIONS methods must supply an empty parameters map or
* else and IllegalArgumentException is thrown.
* HTTP DELETE and OPTIONS methods must supply an empty parameters map or else and IllegalArgumentException is thrown.
*
* @param nodeIds the node identifiers
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD,
* OPTIONS)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD, OPTIONS)
* @param uri the base request URI (up to, but not including, the query string)
* @param parameters any request parameters
* @param headers any HTTP headers
*
* @return the set of node responses
*
* @throws UriConstructionException if a request for a node failed to be
* constructed from the given prototype URI. If thrown, it is guaranteed
* that no request was sent.
* @throws UriConstructionException if a request for a node failed to be constructed from the given prototype URI. If thrown, it is guaranteed that no request was sent.
*/
Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) throws UriConstructionException;
/**
* Requests are sent to each node in the cluster. If the request results in
* an exception, then the NodeResourceResponse will contain the exception.
* Requests are sent to each node in the cluster. If the request results in an exception, then the NodeResourceResponse will contain the exception.
*
* HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an
* IllegalArgumentException if used.
* HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used.
*
* @param nodeIds the node identifiers
* @param method the HTTP method (e.g., POST, PUT)
* @param uri the base request URI (up to, but not including, the query
* string)
* @param uri the base request URI (up to, but not including, the query string)
* @param entity an entity
* @param headers any HTTP headers
*
* @return the set of node responses
*
* @throws UriConstructionException if a request for a node failed to be
* constructed from the given prototype URI. If thrown, it is guaranteed
* that no request was sent.
* @throws UriConstructionException if a request for a node failed to be constructed from the given prototype URI. If thrown, it is guaranteed that no request was sent.
*/
Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) throws UriConstructionException;

View File

@ -29,8 +29,7 @@ import org.apache.nifi.cluster.node.Node.Status;
public interface HttpResponseMapper {
/**
* Maps a HTTP response to a node response and the corresponding node
* status.
* Maps a HTTP response to a node response and the corresponding node status.
*
* @param requestURI the original request URI
* @param nodeResponses a set of node resource responses

View File

@ -40,23 +40,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Encapsulates a node's response in regards to receiving a external API
* request.
* Encapsulates a node's response in regards to receiving a external API request.
*
* Both the ClientResponse and (server) Response may be obtained from this
* instance. The ClientResponse is stored as it is received from the node. This
* includes the entity input stream. The Response is constructed on demand when
* mapping a ClientResponse to the Response. The ClientResponse to Response
* mapping includes copying the ClientResponse's input stream to the Response.
* Therefore, the getResponse() method should not be called more than once.
* Furthermore, the method should not be called if the caller has already read
* the ClientResponse's input stream.
* Both the ClientResponse and (server) Response may be obtained from this instance. The ClientResponse is stored as it is received from the node. This includes the entity input stream. The Response
* is constructed on demand when mapping a ClientResponse to the Response. The ClientResponse to Response mapping includes copying the ClientResponse's input stream to the Response. Therefore, the
* getResponse() method should not be called more than once. Furthermore, the method should not be called if the caller has already read the ClientResponse's input stream.
*
* If a ClientResponse was unable to be created, then a NodeResponse will store
* the Throwable, which may be obtained by calling getThrowable().
* If a ClientResponse was unable to be created, then a NodeResponse will store the Throwable, which may be obtained by calling getThrowable().
*
* This class overrides hashCode and equals and considers two instances to be
* equal if they have the equal NodeIdentifiers.
* This class overrides hashCode and equals and considers two instances to be equal if they have the equal NodeIdentifiers.
*
* @author unattributed
*/
@ -145,14 +137,14 @@ public class NodeResponse {
public int getStatus() {
if (hasThrowable()) {
/*
* since there is a throwable, there is no client input stream to
* since there is a throwable, there is no client input stream to
* worry about maintaining, so we can call getResponse() method
*/
return getResponse().getStatus();
} else {
/*
* use client response's status instead of calling getResponse().getStatus()
* so that we don't read the client's input stream as part of creating
* so that we don't read the client's input stream as part of creating
* the response in the getResponse() method
*/
return clientResponse.getStatus();
@ -160,9 +152,7 @@ public class NodeResponse {
}
/**
* Returns true if the response status is 2xx, false otherwise.
*
* @return
* @return true if the response status is 2xx, false otherwise.
*/
public boolean is2xx() {
final int statusCode = getStatus();
@ -170,9 +160,7 @@ public class NodeResponse {
}
/**
* Returns true if the response status is 5xx, false otherwise.
*
* @return
* @return true if the response status is 5xx, false otherwise.
*/
public boolean is5xx() {
final int statusCode = getStatus();
@ -180,8 +168,7 @@ public class NodeResponse {
}
/**
* Returns null if hasThrowable() is true; otherwise the client's response
* is returned.
* Returns null if hasThrowable() is true; otherwise the client's response is returned.
*
* The ClientResponse's input stream can only be read once.
*
@ -192,24 +179,18 @@ public class NodeResponse {
}
/**
* If this node response has been merged returns the updated entity,
* otherwise null. Also returns null if hasThrowable() is true. The intent
* of this method is to support getting the response entity when it was
* already consumed during the merge operation. In this case the client
* response rom getClientResponse() will not support a getEntity(...) or
* getEntityInputStream() call.
* If this node response has been merged returns the updated entity, otherwise null. Also returns null if hasThrowable() is true. The intent of this method is to support getting the response
* entity when it was already consumed during the merge operation. In this case the client response rom getClientResponse() will not support a getEntity(...) or getEntityInputStream() call.
*
* @return
* @return If this node response has been merged returns the updated entity, otherwise null. Also returns null if hasThrowable() is true
*/
public Entity getUpdatedEntity() {
return updatedEntity;
}
/**
* Creates a Response by mapping the ClientResponse values to it. Since the
* ClientResponse's input stream can only be read once, this method should
* only be called once. Furthermore, the caller should not have already read
* the ClientResponse's input stream.
* Creates a Response by mapping the ClientResponse values to it. Since the ClientResponse's input stream can only be read once, this method should only be called once. Furthermore, the caller
* should not have already read the ClientResponse's input stream.
*
* @return the response
*/
@ -232,11 +213,9 @@ public class NodeResponse {
}
/**
* Returns true if a throwable was thrown and a response was not able to be
* created; false otherwise.
* Returns true if a throwable was thrown and a response was not able to be created; false otherwise.
*
* @return true if a throwable was thrown and a response was not able to be
* created; false otherwise
* @return true if a throwable was thrown and a response was not able to be created; false otherwise
*/
public boolean hasThrowable() {
return getThrowable() != null;

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a HTTP request that may change a node's
* dataflow is to be replicated while a node is connecting to the cluster.
* Represents the exceptional case when a HTTP request that may change a node's dataflow is to be replicated while a node is connecting to the cluster.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a HTTP request that may change a node's
* dataflow is to be replicated while one or more nodes are disconnected.
* Represents the exceptional case when a HTTP request that may change a node's dataflow is to be replicated while one or more nodes are disconnected.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Signals that an operation to be performed on a cluster has been invoked at an
* illegal or inappropriate time.
* Signals that an operation to be performed on a cluster has been invoked at an illegal or inappropriate time.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a deletion request is issued to a node
* that cannot be deleted (e.g., the node is not disconnected).
* Represents the exceptional case when a deletion request is issued to a node that cannot be deleted (e.g., the node is not disconnected).
*
* @author unattributed
*/

View File

@ -17,9 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a disconnection request is issued to a
* node that cannot be disconnected (e.g., last node in cluster, node is primary
* node).
* Represents the exceptional case when a disconnection request is issued to a node that cannot be disconnected (e.g., last node in cluster, node is primary node).
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a reconnection request is issued to a
* node that cannot be reconnected (e.g., the node is not disconnected).
* Represents the exceptional case when a reconnection request is issued to a node that cannot be reconnected (e.g., the node is not disconnected).
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when the primary role cannot be assigned to a
* node because the node is ineligible for the role.
* Represents the exceptional case when the primary role cannot be assigned to a node because the node is ineligible for the role.
*
* @author unattributed
*/

View File

@ -17,9 +17,8 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a HTTP request that may change a node's
* state is to be replicated while the cluster or connected nodes are unable to
* change their state (e.g., a new node is connecting to the cluster).
* Represents the exceptional case when a HTTP request that may change a node's state is to be replicated while the cluster or connected nodes are unable to change their state (e.g., a new node is
* connecting to the cluster).
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when the cluster is unable to service a
* request because no nodes are connected.
* Represents the exceptional case when the cluster is unable to service a request because no nodes are connected.
*
* @author unattributed
*/

View File

@ -17,9 +17,8 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when the cluster is unable to service a
* request because no nodes returned a response. When the given request is not
* mutable the nodes are left in their previous state.
* Represents the exceptional case when the cluster is unable to service a request because no nodes returned a response. When the given request is not mutable the nodes are left in their previous
* state.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a disconnection request to a node
* failed.
* Represents the exceptional case when a disconnection request to a node failed.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when the cluster is unable to update the
* primary role of a node.
* Represents the exceptional case when the cluster is unable to update the primary role of a node.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a HTTP request that may change a node's
* dataflow is to be replicated while the cluster is in safe mode.
* Represents the exceptional case when a HTTP request that may change a node's dataflow is to be replicated while the cluster is in safe mode.
*
* @author unattributed
*/

View File

@ -17,8 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a request is made for a node that does
* not exist.
* Represents the exceptional case when a request is made for a node that does not exist.
*
* @author unattributed
*/

View File

@ -17,9 +17,7 @@
package org.apache.nifi.cluster.manager.exception;
/**
* Represents the exceptional case when a URI cannot be constructed from the
* given information. This exception is similar to Java's URISyntaxException
* except that it extends RuntimeException.
* Represents the exceptional case when a URI cannot be constructed from the given information. This exception is similar to Java's URISyntaxException except that it extends RuntimeException.
*
* @author unattributed
*/

View File

@ -59,21 +59,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of the <code>HttpRequestReplicator</code> interface. This
* implementation parallelizes the node HTTP requests using the given
* <code>ExecutorService</code> instance. Individual requests may have
* connection and read timeouts set, which may be set during instance
* construction. Otherwise, the default is not to timeout.
* An implementation of the <code>HttpRequestReplicator</code> interface. This implementation parallelizes the node HTTP requests using the given <code>ExecutorService</code> instance. Individual
* requests may have connection and read timeouts set, which may be set during instance construction. Otherwise, the default is not to timeout.
*
* If a node protocol scheme is provided during construction, then all requests
* will be replicated using the given scheme. If null is provided as the scheme
* (the default), then the requests will be replicated using the scheme of the
* original URI.
* If a node protocol scheme is provided during construction, then all requests will be replicated using the given scheme. If null is provided as the scheme (the default), then the requests will be
* replicated using the scheme of the original URI.
*
* Clients must call start() and stop() to initialize and shutdown the instance.
* The instance must be started before issuing any replication requests.
* Clients must call start() and stop() to initialize and shutdown the instance. The instance must be started before issuing any replication requests.
*
* @author unattributed
*/
public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
@ -97,11 +90,9 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
private String nodeProtocolScheme = null;
/**
* Creates an instance. The connection timeout and read timeout will be
* infinite.
* Creates an instance. The connection timeout and read timeout will be infinite.
*
* @param numThreads the number of threads to use when parallelizing
* requests
* @param numThreads the number of threads to use when parallelizing requests
* @param client a client for making requests
*/
public HttpRequestReplicatorImpl(final int numThreads, final Client client) {
@ -111,12 +102,10 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
/**
* Creates an instance.
*
* @param numThreads the number of threads to use when parallelizing
* requests
* @param numThreads the number of threads to use when parallelizing requests
* @param client a client for making requests
* @param connectionTimeoutMs the connection timeout specified in
* milliseconds
* @param readTimeoutMs the read timeout specified in milliseconds
* @param connectionTimeout the connection timeout specified in milliseconds
* @param readTimeout the read timeout specified in milliseconds
*/
public HttpRequestReplicatorImpl(final int numThreads, final Client client, final String connectionTimeout, final String readTimeout) {
@ -178,9 +167,7 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
/**
* Sets the protocol scheme to use when issuing requests to nodes.
*
* @param nodeProtocolScheme the scheme. Valid values are "http", "https",
* or null. If null is specified, then the scheme of the originating request
* is used when replicating that request.
* @param nodeProtocolScheme the scheme. Valid values are "http", "https", or null. If null is specified, then the scheme of the originating request is used when replicating that request.
*/
public synchronized void setNodeProtocolScheme(final String nodeProtocolScheme) {
if (StringUtils.isNotBlank(nodeProtocolScheme)) {
@ -368,9 +355,7 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
}
/**
* Wraps a future node response with info from originating request. This
* coupling allows for futures that encountered exceptions to be linked back
* to the failing node and better reported.
* Wraps a future node response with info from originating request. This coupling allows for futures that encountered exceptions to be linked back to the failing node and better reported.
*/
private class NodeHttpRequestFutureWrapper {
@ -417,8 +402,7 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
}
/**
* A Callable for making an HTTP request to a single node and returning its
* response.
* A Callable for making an HTTP request to a single node and returning its response.
*/
private class NodeHttpRequestCallable implements Callable<NodeResponse> {

View File

@ -33,12 +33,9 @@ import org.slf4j.LoggerFactory;
*
* The algorithm is as follows.
*
* If any HTTP responses were 2XX, then disconnect non-2XX responses. This is
* because 2XX may have changed a node's flow.
* If any HTTP responses were 2XX, then disconnect non-2XX responses. This is because 2XX may have changed a node's flow.
*
* If no 2XX responses were received, then the node's flow has not changed.
* Instead of disconnecting everything, we only disconnect the nodes with
* internal errors, i.e., 5XX responses.
* If no 2XX responses were received, then the node's flow has not changed. Instead of disconnecting everything, we only disconnect the nodes with internal errors, i.e., 5XX responses.
*
* @author unattributed
*/
@ -60,7 +57,7 @@ public class HttpResponseMapperImpl implements HttpResponseMapper {
}
}
// determine the status of each node
// determine the status of each node
for (final NodeResponse nodeResponse : nodeResponses) {
final Node.Status status;

View File

@ -232,20 +232,14 @@ import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
/**
* Provides a cluster manager implementation. The manager federates incoming
* HTTP client requests to the nodes' external API using the HTTP protocol. The
* manager also communicates with nodes using the nodes' internal socket
* protocol.
* Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the
* nodes' internal socket protocol.
*
* The manager's socket address may broadcasted using multicast if a
* MulticastServiceBroadcaster instance is set on this instance. The manager
* instance must be started after setting the broadcaster.
* The manager's socket address may broadcasted using multicast if a MulticastServiceBroadcaster instance is set on this instance. The manager instance must be started after setting the broadcaster.
*
* The manager may be configured with an EventManager for recording noteworthy
* lifecycle events (e.g., first heartbeat received, node status change).
* The manager may be configured with an EventManager for recording noteworthy lifecycle events (e.g., first heartbeat received, node status change).
*
* The start() and stop() methods must be called to initialize and stop the
* instance.
* The start() and stop() methods must be called to initialize and stop the instance.
*
* @author unattributed
*/
@ -258,47 +252,38 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
/**
* The HTTP header to store a cluster context. An example of what may be
* stored in the context is a node's auditable actions in response to a
* cluster request. The cluster context is serialized using Java's
* serialization mechanism and hex encoded.
* The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized
* using Java's serialization mechanism and hex encoded.
*/
public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
/**
* HTTP Header that stores a unique ID for each request that is replicated
* to the nodes. This is used for logging purposes so that request
* information, such as timing, can be correlated between the NCM and the
* nodes
* HTTP Header that stores a unique ID for each request that is replicated to the nodes. This is used for logging purposes so that request information, such as timing, can be correlated between
* the NCM and the nodes
*/
public static final String REQUEST_ID_HEADER = "X-RequestID";
/**
* The HTTP header that the NCM specifies to ask a node if they are able to
* process a given request. The value is always 150-NodeContinue. The node
* will respond with 150 CONTINUE if it is able to process the request, 417
* EXPECTATION_FAILED otherwise.
* The HTTP header that the NCM specifies to ask a node if they are able to process a given request. The value is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
/**
* The HTTP header that the NCM specifies to indicate that a node should
* invalidate the specified user group. This is done to ensure that user
* cache is not stale when an administrator modifies a group through the UI.
* The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user group. This is done to ensure that user cache is not stale when an administrator modifies a
* group through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup";
/**
* The HTTP header that the NCM specifies to indicate that a node should
* invalidate the specified user. This is done to ensure that user cache is
* not stale when an administrator modifies a user through the UI.
* The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user. This is done to ensure that user cache is not stale when an administrator modifies a user
* through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser";
/**
* The default number of seconds to respond to a connecting node if the
* manager cannot provide it with a current data flow.
* The default number of seconds to respond to a connecting node if the manager cannot provide it with a current data flow.
*/
private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
@ -398,7 +383,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
}
componentStatusSnapshotMillis = snapshotMillis;
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
@ -411,7 +396,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// ensure this node has a payload
if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) {
// if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp
@ -422,7 +407,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
}
} catch(final Throwable t) {
} catch (final Throwable t) {
logger.warn("Unable to capture component metrics from Node heartbeats: " + t);
if (logger.isDebugEnabled()) {
logger.warn("", t);
@ -667,14 +652,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Services connection requests. If the data flow management service is
* unable to provide a current copy of the data flow, then the returned
* connection response will indicate the node should try later. Otherwise,
* the connection response will contain the the flow and the node
* identifier.
* Services connection requests. If the data flow management service is unable to provide a current copy of the data flow, then the returned connection response will indicate the node should try
* later. Otherwise, the connection response will contain the the flow and the node identifier.
*
* If this instance is configured with a firewall and the request is
* blocked, then the response will not contain a node identifier.
* If this instance is configured with a firewall and the request is blocked, then the response will not contain a node identifier.
*
* @param request a connection request
*
@ -754,14 +735,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/*
* The manager does not have a current copy of the data flow,
* so it will instruct the node to try connecting at a later
* time. Meanwhile, the flow will be locked down from user
* The manager does not have a current copy of the data flow,
* so it will instruct the node to try connecting at a later
* time. Meanwhile, the flow will be locked down from user
* changes because the node is marked as connecting.
*/
/*
* Create try-later response based on flow retrieval delay to give
* Create try-later response based on flow retrieval delay to give
* the flow management service a chance to retrieve a curren flow
*/
final int tryAgainSeconds;
@ -783,20 +764,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Services reconnection requests for a given node. If the node indicates
* reconnection failure, then the node will be set to disconnected and if
* the node has primary role, then the role will be revoked. Otherwise, a
* reconnection request will be sent to the node, initiating the connection
* handshake.
* Services reconnection requests for a given node. If the node indicates reconnection failure, then the node will be set to disconnected and if the node has primary role, then the role will be
* revoked. Otherwise, a reconnection request will be sent to the node, initiating the connection handshake.
*
* @param nodeId a node identifier
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeReconnectionException if the node cannot be
* reconnected because the node is not disconnected
* @throws NodeReconnectionException if the reconnection message failed to
* be sent or the cluster could not provide a current data flow for the
* reconnection request
* @throws IllegalNodeReconnectionException if the node cannot be reconnected because the node is not disconnected
* @throws NodeReconnectionException if the reconnection message failed to be sent or the cluster could not provide a current data flow for the reconnection request
*/
@Override
public void requestReconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeReconnectionException {
@ -1163,11 +1138,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
* @param userDn the DN of the user requesting the disconnection
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeDisconnectionException if the node cannot be
* disconnected due to the cluster's state (e.g., node is last connected
* node or node is primary)
* @throws NodeDisconnectionException if the disconnection message fails to
* be sent.
* @throws IllegalNodeDisconnectionException if the node cannot be disconnected due to the cluster's state (e.g., node is last connected node or node is primary)
* @throws NodeDisconnectionException if the disconnection message fails to be sent.
*/
@Override
public void requestDisconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException {
@ -1185,8 +1157,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Requests a disconnection to the node with the given node ID, but any
* exception thrown is suppressed.
* Requests a disconnection to the node with the given node ID, but any exception thrown is suppressed.
*
* @param nodeId the node ID
*/
@ -1197,28 +1168,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Issues a disconnection message to the node identified by the given node
* ID. If the node is not known, then a UnknownNodeException is thrown. If
* the node cannot be disconnected due to the cluster's state and
* ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is
* thrown. Otherwise, a disconnection message is issued to the node.
* Issues a disconnection message to the node identified by the given node ID. If the node is not known, then a UnknownNodeException is thrown. If the node cannot be disconnected due to the
* cluster's state and ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is thrown. Otherwise, a disconnection message is issued to the node.
*
* Whether the disconnection message is successfully sent to the node, the
* node is marked as disconnected and if the node is the primary node, then
* the primary role is revoked.
* Whether the disconnection message is successfully sent to the node, the node is marked as disconnected and if the node is the primary node, then the primary role is revoked.
*
* @param nodeId the ID of the node
* @param ignoreNodeChecks if false, checks will be made to ensure the
* cluster supports the node's disconnection (e.g., the node is not the last
* connected node in the cluster; the node is not the primary); otherwise,
* the request is made regardless of the cluster state
* @param ignoreNodeChecks if false, checks will be made to ensure the cluster supports the node's disconnection (e.g., the node is not the last connected node in the cluster; the node is not the
* primary); otherwise, the request is made regardless of the cluster state
* @param explanation
*
* @throws IllegalNodeDisconnectionException if the node cannot be
* disconnected due to the cluster's state (e.g., node is last connected
* node or node is primary). Not thrown if ignoreNodeChecks is true.
* @throws NodeDisconnectionException if the disconnection message fails to
* be sent.
* @throws IllegalNodeDisconnectionException if the node cannot be disconnected due to the cluster's state (e.g., node is last connected node or node is primary). Not thrown if ignoreNodeChecks is
* true.
* @throws NodeDisconnectionException if the disconnection message fails to be sent.
*/
private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
throws IllegalNodeDisconnectionException, NodeDisconnectionException {
@ -1276,8 +1238,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Messages the node to have the primary role. If the messaging fails, then
* the node is marked as disconnected.
* Messages the node to have the primary role. If the messaging fails, then the node is marked as disconnected.
*
* @param nodeId the node ID to assign primary role
*
@ -1292,7 +1253,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
msg.setPrimary(true);
logger.info("Attempting to assign primary role to node: " + nodeId);
// message
// message
senderListener.assignPrimaryRole(msg);
logger.info("Assigned primary role to node: " + nodeId);
@ -1321,11 +1282,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Messages the node with the given node ID to no longer have the primary
* role. If the messaging fails, then the node is marked as disconnected.
* Messages the node with the given node ID to no longer have the primary role. If the messaging fails, then the node is marked as disconnected.
*
* @return true if the primary role was revoked from the node; false
* otherwise
* @return true if the primary role was revoked from the node; false otherwise
*/
private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
writeLock.lock();
@ -1382,8 +1341,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Node node = getRawNode(msg.getNodeId().getId());
if (node != null) {
node.setStatus(Status.DISCONNECTED);
addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported "
+ "the following error: " + msg.getExceptionMessage());
addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node "
+ "reported the following error: " + msg.getExceptionMessage());
}
} finally {
writeLock.unlock("handleControllerStartupFailure");
@ -1405,14 +1366,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
/**
* Adds an instance of a specified controller service.
*
* @param type
* @param id
* @param properties
* @return
*/
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
@ -1666,7 +1619,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
/**
* Handle a bulletins message.
*
* @param bulletins
* @param bulletins bulletins
*/
public void handleBulletins(final NodeBulletins bulletins) {
final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
@ -1681,15 +1634,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Handles a node's heartbeat. If this heartbeat is a node's first heartbeat
* since its connection request, then the manager will mark the node as
* connected. If the node was previously disconnected due to a lack of
* heartbeat, then a reconnection request is issued. If the node was
* disconnected for other reasons, then a disconnection request is issued.
* If this instance is configured with a firewall and the heartbeat is
* blocked, then a disconnection request is issued.
*
* @param heartbeat
* Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
* due to a lack of heartbeat, then a reconnection request is issued. If the node was disconnected for other reasons, then a disconnection request is issued. If this instance is configured with a
* firewall and the heartbeat is blocked, then a disconnection request is issued.
*/
@Override
public void handleHeartbeat(final Heartbeat heartbeat) {
@ -1703,9 +1650,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
/*
* Processing a heartbeat requires a write lock, which may take a while
* to obtain. Only the last heartbeat is necessary to process per node.
* Futhermore, since many could pile up, heartbeats are processed in
* Futhermore, since many could pile up, heartbeats are processed in
* bulk.
*
* The below queue stores the pending heartbeats.
*/
pendingHeartbeats.add(heartbeat);
@ -1782,7 +1728,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
} else if (heartbeatIndicatesNotYetConnected) {
if (Status.CONNECTED == node.getStatus()) {
// record event
addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it was. Marking as Disconnected and issuing reconnection request.");
addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it "
+ "was. Marking as Disconnected and issuing reconnection request.");
// record heartbeat
node.setHeartbeat(null);
@ -1843,7 +1790,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
node.setHeartbeat(mostRecentHeartbeat);
}
} catch (final Exception e) {
logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
logger.error("Failed to process heartbeat from {}:{} due to {}",
mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
@ -2043,13 +1991,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException,
ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
return applyRequest(method, uri, parameters, headers, getNodeIds(Status.CONNECTED));
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException,
ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
final boolean mutableRequest = canChangeNodeState(method, uri);
final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
@ -2085,13 +2035,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Object entity, final Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException,
ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
return applyRequest(method, uri, entity, headers, getNodeIds(Status.CONNECTED));
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Object entity, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException,
ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
final boolean mutableRequest = canChangeNodeState(method, uri);
final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
@ -2270,7 +2222,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
// requires write lock to already be acquired unless method cannot change node state
private NodeResponse federateRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers, final Set<NodeIdentifier> nodeIds) throws UriConstructionException {
private NodeResponse federateRequest(
final String method, final URI uri, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers, final Set<NodeIdentifier> nodeIds)
throws UriConstructionException {
// ensure some nodes are connected
if (nodeIds.isEmpty()) {
throw new NoConnectedNodesException("Cannot apply " + method + " request to " + uri + " because there are currently no connected Nodes");
@ -2399,7 +2353,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private static boolean isProcessorEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method))
&& (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
return true;
} else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@ -2674,7 +2629,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
private void mergeControllerServiceReferences(
final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
final Map<String, Integer> activeThreadCounts = new HashMap<>();
final Map<String, String> states = new HashMap<>();
for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
@ -2782,12 +2738,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Merges the validation errors into the specified map, recording the
* corresponding node identifier.
* Merges the validation errors into the specified map, recording the corresponding node identifier.
*
* @param validationErrorMap
* @param nodeId
* @param nodeValidationErrors
* @param validationErrorMap map
* @param nodeId id
* @param nodeValidationErrors errors
*/
public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
if (nodeValidationErrors != null) {
@ -2803,12 +2758,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Normalizes the validation errors by prepending the corresponding nodes
* when the error does not exist across all nodes.
* Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
*
* @param validationErrorMap
* @param totalNodes
* @return
* @param validationErrorMap map
* @param totalNodes total
* @return normalized errors
*/
public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
final Set<String> normalizedValidationErrors = new HashSet<>();
@ -2862,7 +2816,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final boolean hasClientResponse = clientResponse != null;
final boolean hasSuccessfulClientResponse = hasClientResponse && clientResponse.is2xx();
// drain the responses from the socket for those responses not being sent to the client
// drain the responses from the socket for those responses not being sent to the client
final Set<NodeResponse> nodeResponsesToDrain = new HashSet<>(updatedNodesMap.values());
nodeResponsesToDrain.remove(clientResponse);
@ -3177,7 +3131,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
(nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@ -3243,9 +3198,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/*
* Nodes that encountered issues handling the request are marked as
* disconnected for mutable requests (e.g., post, put, delete). For
* other requests (e.g., get, head), the nodes remain in their current
* Nodes that encountered issues handling the request are marked as
* disconnected for mutable requests (e.g., post, put, delete). For
* other requests (e.g., get, head), the nodes remain in their current
* state even if they had problems handling the request.
*/
if (mutableRequest) {
@ -3260,7 +3215,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// mark flow as stale since this request could have changed the flow
notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.STALE);
// disconnect problematic nodes
// disconnect problematic nodes
if (!problematicNodeResponses.isEmpty()) {
if (problematicNodeResponses.size() < nodeResponses.size()) {
logger.warn(String.format("One or more nodes failed to process URI '%s'. Requesting each node to disconnect from cluster.", uri));
@ -3275,8 +3230,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Drains the node responses off of the socket to ensure that the socket is
* appropriately cleaned-up.
* Drains the node responses off of the socket to ensure that the socket is appropriately cleaned-up.
*
* @param nodeResponses the collection of node responses
*/
@ -3313,11 +3267,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* A helper method to disconnect nodes that returned unsuccessful HTTP
* responses because of a replicated request. Disconnection requests are
* sent concurrently.
* A helper method to disconnect nodes that returned unsuccessful HTTP responses because of a replicated request. Disconnection requests are sent concurrently.
*
* @param nodeResponses
*/
private void disconnectNodes(final Set<NodeResponse> nodeResponses, final String explanation) {
// return fast if nothing to do
@ -3363,14 +3314,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Returns false if an internal protocol message was received by a node
* listed in the firewall. If no firewall is configured, then false is
* always returned.
* Returns false if an internal protocol message was received by a node listed in the firewall. If no firewall is configured, then false is always returned.
*
* @param ip the IP of the remote machine
*
* @return false if the IP is listed in the firewall or if the firewall is
* not configured; true otherwise
* @return false if the IP is listed in the firewall or if the firewall is not configured; true otherwise
*/
private boolean isBlockedByFirewall(final String ip) {
if (isFirewallConfigured()) {
@ -3417,10 +3365,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* Resolves a proposed node identifier to a node identifier that the manager
* approves. If the proposed node identifier conflicts with an existing node
* identifier, then an approved node identifier is generated and returned to
* the caller.
* Resolves a proposed node identifier to a node identifier that the manager approves. If the proposed node identifier conflicts with an existing node identifier, then an approved node identifier
* is generated and returned to the caller.
*
* @param proposedNodeId a proposed identifier
*
@ -3579,11 +3525,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* This timer task simply processes any pending heartbeats. This timer task
* is not strictly needed, as HeartbeatMonitoringTimerTask will do this.
* However, this task is scheduled much more frequently and by processing
* the heartbeats more frequently, the stats that we report have less of a
* delay.
* This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more
* frequently and by processing the heartbeats more frequently, the stats that we report have less of a delay.
*/
private class ProcessPendingHeartbeatsTask extends TimerTask {
@ -3599,13 +3542,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
* A timer task to detect nodes that have not sent a heartbeat in a while.
* The "problem" nodes are marked as disconnected due to lack of heartbeat
* by the task. No disconnection request is sent to the node. This is
* because either the node is not functioning in which case sending the
* request is futile or the node is running a bit slow. In the latter case,
* we'll wait for the next heartbeat and send a reconnection request when we
* process the heartbeat in the heartbeatHandler() method.
* A timer task to detect nodes that have not sent a heartbeat in a while. The "problem" nodes are marked as disconnected due to lack of heartbeat by the task. No disconnection request is sent to
* the node. This is because either the node is not functioning in which case sending the request is futile or the node is running a bit slow. In the latter case, we'll wait for the next heartbeat
* and send a reconnection request when we process the heartbeat in the heartbeatHandler() method.
*/
private class HeartbeatMonitoringTimerTask extends TimerTask {
@ -3899,7 +3838,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) {
final List<StatusSnapshot> snapshots = entry.getValue();
final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots);
final StatusSnapshotDTO dto = new StatusSnapshotDTO();
dto.setTimestamp(reducedSnapshot.getTimestamp());
dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics());

View File

@ -27,13 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a connected flow controller. Nodes always have an immutable
* identifier and a status. The status may be changed, but never null.
* Represents a connected flow controller. Nodes always have an immutable identifier and a status. The status may be changed, but never null.
*
* A Node may be cloned, but the cloning is a shallow copy of the instance.
*
* This class overrides hashCode and equals and considers two instances to be
* equal if they have the equal NodeIdentifiers.
* This class overrides hashCode and equals and considers two instances to be equal if they have the equal NodeIdentifiers.
*
* @author unattributed
*/
@ -44,19 +42,12 @@ public class Node implements Cloneable, Comparable<Node> {
/**
* The semantics of a Node status are as follows:
* <ul>
* <li>CONNECTED -- a flow controller that is connected to the cluster. A
* connecting node transitions to connected after the cluster receives the
* flow controller's first heartbeat. A connected node can transition to
* disconnecting.</li>
* <li>CONNECTING -- a flow controller has issued a connection request to
* the cluster, but has not yet sent a heartbeat. A connecting node can
* transition to disconnecting or connected. The cluster will not accept any
* external requests to change the flow while any node is connecting.</li>
* <li>DISCONNECTED -- a flow controller that is not connected to the
* cluster. A disconnected node can transition to connecting.</li>
* <li>DISCONNECTING -- a flow controller that is in the process of
* disconnecting from the cluster. A disconnecting node will always
* transition to disconnected.</li>
* <li>CONNECTED -- a flow controller that is connected to the cluster. A connecting node transitions to connected after the cluster receives the flow controller's first heartbeat. A connected
* node can transition to disconnecting.</li>
* <li>CONNECTING -- a flow controller has issued a connection request to the cluster, but has not yet sent a heartbeat. A connecting node can transition to disconnecting or connected. The cluster
* will not accept any external requests to change the flow while any node is connecting.</li>
* <li>DISCONNECTED -- a flow controller that is not connected to the cluster. A disconnected node can transition to connecting.</li>
* <li>DISCONNECTING -- a flow controller that is in the process of disconnecting from the cluster. A disconnecting node will always transition to disconnected.</li>
* </ul>
*/
public static enum Status {
@ -93,8 +84,7 @@ public class Node implements Cloneable, Comparable<Node> {
private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L);
/**
* a flag to indicate this node was disconnected because of a lack of
* heartbeat
* a flag to indicate this node was disconnected because of a lack of heartbeat
*/
private boolean heartbeatDisconnection;
@ -156,8 +146,7 @@ public class Node implements Cloneable, Comparable<Node> {
}
/**
* Sets the time when the connection request for this node was last
* received.
* Sets the time when the connection request for this node was last received.
*
* This method is thread-safe and may be called without obtaining any lock.
*
@ -168,19 +157,16 @@ public class Node implements Cloneable, Comparable<Node> {
}
/**
* Returns true if the node was disconnected due to lack of heartbeat; false
* otherwise.
* Returns true if the node was disconnected due to lack of heartbeat; false otherwise.
*
* @return true if the node was disconnected due to lack of heartbeat; false
* otherwise.
* @return true if the node was disconnected due to lack of heartbeat; false otherwise.
*/
public boolean isHeartbeatDisconnection() {
return heartbeatDisconnection;
}
/**
* Sets the status to disconnected and flags the node as being disconnected
* by lack of heartbeat.
* Sets the status to disconnected and flags the node as being disconnected by lack of heartbeat.
*/
public void setHeartbeatDisconnection() {
setStatus(Status.DISCONNECTED);

View File

@ -33,12 +33,10 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* Factory bean for creating a singleton ClusterManagerProtocolServiceLocator
* instance. If the application is configured to act as the cluster manager,
* then null is always returned as the created instance.
* Factory bean for creating a singleton ClusterManagerProtocolServiceLocator instance. If the application is configured to act as the cluster manager, then null is always returned as the created
* instance.
*
* The cluster manager protocol service represents the socket endpoint for
* sending internal socket messages to the cluster manager.
* The cluster manager protocol service represents the socket endpoint for sending internal socket messages to the cluster manager.
*/
public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean {

View File

@ -36,9 +36,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* Factory bean for creating a singleton WebClusterManager instance. If the
* application is not configured to act as the cluster manager, then null is
* always returned as the created instance.
* Factory bean for creating a singleton WebClusterManager instance. If the application is not configured to act as the cluster manager, then null is always returned as the created instance.
*/
public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware {

View File

@ -16,15 +16,16 @@
*/
package org.apache.nifi.cluster.event.impl;
import org.apache.nifi.cluster.event.impl.EventManagerImpl;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.Event.Category;
import org.apache.nifi.cluster.event.EventManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author unattributed

View File

@ -20,8 +20,10 @@ import java.io.File;
import java.io.IOException;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import static org.junit.Assert.*;
import org.junit.Test;
public class FileBasedClusterNodeFirewallTest {

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.manager.impl;
import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl;
import javax.ws.rs.core.Response;
import javax.xml.bind.annotation.XmlRootElement;
import javax.ws.rs.HttpMethod;
@ -43,7 +42,9 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* @author unattributed

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.manager.impl;
import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.io.ByteArrayInputStream;
@ -29,10 +28,11 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author unattributed

View File

@ -30,8 +30,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
/**
* Encapsulates an HTTP request. The toString method returns the
* specification-compliant request.
* Encapsulates an HTTP request. The toString method returns the specification-compliant request.
*
* @author unattributed
*/
@ -97,9 +96,7 @@ public class HttpRequest {
}
/**
* A builder for constructing basic HTTP requests. It handles only enough of
* the HTTP specification to support basic unit testing, and it should not
* be used otherwise.
* A builder for constructing basic HTTP requests. It handles only enough of the HTTP specification to support basic unit testing, and it should not be used otherwise.
*/
public static class HttpRequestBuilder {

View File

@ -22,8 +22,7 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
/**
* Encapsulates an HTTP response. The toString method returns the
* specification-compliant response.
* Encapsulates an HTTP response. The toString method returns the specification-compliant response.
*
* @author unattributed
*/

View File

@ -17,9 +17,7 @@
package org.apache.nifi.cluster.manager.testutils;
/**
* Wraps a HttpResponse with a time-delay. When the action is applied, the
* currently executing thread sleeps for the given delay before returning the
* response to the caller.
* Wraps a HttpResponse with a time-delay. When the action is applied, the currently executing thread sleeps for the given delay before returning the response to the caller.
*
* This class is good for simulating network latency.
*

View File

@ -37,8 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple HTTP web server that allows clients to register canned-responses to
* respond to received requests.
* A simple HTTP web server that allows clients to register canned-responses to respond to received requests.
*
* @author unattributed
*/

View File

@ -31,11 +31,13 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.junit.After;
import static org.junit.Assert.*;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

View File

@ -20,10 +20,12 @@ import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.OngoingStubbing;
public class ClusterServiceLocatorTest {

View File

@ -28,7 +28,8 @@ import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.junit.After;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

View File

@ -35,7 +35,7 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.apache.nifi.io.socket.multicast.MulticastUtils;
import org.junit.After;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

View File

@ -111,7 +111,8 @@ public class NodeProtocolSenderImplTest {
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
ConnectionRequestMessage request = new ConnectionRequestMessage();

View File

@ -33,7 +33,8 @@ import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.junit.After;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;