mirror of https://github.com/apache/nifi.git
NIFI-2824: - Updating replication logic to account for the potential replication target and then invoking the corresponding action.
Signed-off-by: Yolanda M. Davis <ymdavis@apache.org> This closes #1068
This commit is contained in:
parent
508b218b59
commit
dd9ecc2907
|
@ -49,6 +49,7 @@ import org.apache.nifi.controller.reporting.ReportingTaskProvider;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||||
import org.apache.nifi.registry.VariableRegistry;
|
import org.apache.nifi.registry.VariableRegistry;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget;
|
||||||
import org.apache.nifi.web.api.dto.AllowableValueDTO;
|
import org.apache.nifi.web.api.dto.AllowableValueDTO;
|
||||||
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
|
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
|
||||||
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
|
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
|
||||||
|
@ -72,14 +73,12 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
|
* Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
|
||||||
|
@ -292,16 +291,24 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
|
||||||
return componentFacade.updateComponent(requestContext, annotationData, properties);
|
return componentFacade.updateComponent(requestContext, annotationData, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ReplicationTarget getReplicationTarget() {
|
||||||
|
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
|
||||||
|
}
|
||||||
|
|
||||||
private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
|
private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
|
||||||
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
|
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
|
||||||
if (coordinatorNode == null) {
|
if (coordinatorNode == null) {
|
||||||
throw new NoClusterCoordinatorException();
|
throw new NoClusterCoordinatorException();
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
||||||
return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse();
|
// to the cluster nodes themselves.
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
return requestReplicator.replicate(method, uri, entity, headers).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
return requestReplicator.forwardToCoordinator(coordinatorNode, method, uri, entity, headers).awaitMergedResponse();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Facade over accessing different types of NiFi components.
|
* Facade over accessing different types of NiFi components.
|
||||||
|
|
Loading…
Reference in New Issue