mirror of https://github.com/apache/nifi.git
NIFI-926:
- Ensuring that nodes are not disconnected when the user attempts to reset a counter that does not exist on that node. This can happen when/if counters are adjusted conditionally.
This commit is contained in:
parent
85534ca860
commit
d1206fe074
|
@ -309,6 +309,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
|
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
|
||||||
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
|
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
|
||||||
|
|
||||||
|
public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
|
||||||
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
|
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
|
||||||
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
|
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
|
||||||
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
|
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
|
||||||
|
@ -2424,6 +2425,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
|
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isCountersEndpoint(final URI uri) {
|
||||||
|
return COUNTERS_URI.matcher(uri.getPath()).matches();
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
|
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
|
||||||
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
|
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
|
||||||
}
|
}
|
||||||
|
@ -3227,6 +3232,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
*/
|
*/
|
||||||
if (mutableRequest) {
|
if (mutableRequest) {
|
||||||
|
|
||||||
|
// if some (not all) nodes had a problematic response because of a missing counter, ensure the are not disconnected
|
||||||
|
if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size() && isMissingCounter(problematicNodeResponses, uri)) {
|
||||||
|
for (final Map.Entry<Node, NodeResponse> updatedNodeEntry : updatedNodesMap.entrySet()) {
|
||||||
|
final NodeResponse nodeResponse = updatedNodeEntry.getValue();
|
||||||
|
final Node node = updatedNodeEntry.getKey();
|
||||||
|
|
||||||
|
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||||
|
node.setStatus(Status.CONNECTED);
|
||||||
|
problematicNodeResponses.remove(nodeResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// set the updated nodes
|
// set the updated nodes
|
||||||
nodes.removeAll(updatedNodesMap.keySet());
|
nodes.removeAll(updatedNodesMap.keySet());
|
||||||
nodes.addAll(updatedNodesMap.keySet());
|
nodes.addAll(updatedNodesMap.keySet());
|
||||||
|
@ -3251,6 +3269,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return clientResponse;
|
return clientResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and
|
||||||
|
* is not comprised of responses from all nodes in the cluster (at least one node contained the counter in question).
|
||||||
|
*
|
||||||
|
* @param problematicNodeResponses The problematic node responses
|
||||||
|
* @param uri The URI for the request
|
||||||
|
* @return Whether all problematic node responses were due to a missing counter
|
||||||
|
*/
|
||||||
|
private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) {
|
||||||
|
if (isCountersEndpoint(uri)) {
|
||||||
|
boolean notFound = true;
|
||||||
|
for (final NodeResponse problematicResponse : problematicNodeResponses) {
|
||||||
|
if (problematicResponse.getStatus() != 404) {
|
||||||
|
notFound = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notFound;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drains the node responses off of the socket to ensure that the socket is appropriately cleaned-up.
|
* Drains the node responses off of the socket to ensure that the socket is appropriately cleaned-up.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue