mirror of https://github.com/apache/nifi.git
NIFI-271 checkpoint
This commit is contained in:
parent
888254b2a1
commit
9dda16c995
|
@ -22,38 +22,44 @@ import org.apache.nifi.action.Action;
|
|||
import org.apache.nifi.web.Revision;
|
||||
|
||||
/**
|
||||
* Contains contextual information about clustering that may be serialized
|
||||
* Contains contextual information about clustering that may be serialized
|
||||
* between manager and node when communicating over HTTP.
|
||||
*/
|
||||
public interface ClusterContext extends Serializable {
|
||||
|
||||
|
||||
/**
|
||||
* Returns a list of auditable actions. The list is modifiable
|
||||
* and will never be null.
|
||||
* Returns a list of auditable actions. The list is modifiable and will
|
||||
* never be null.
|
||||
*
|
||||
* @return a collection of actions
|
||||
*/
|
||||
List<Action> getActions();
|
||||
|
||||
|
||||
Revision getRevision();
|
||||
|
||||
|
||||
void setRevision(Revision revision);
|
||||
|
||||
|
||||
/**
|
||||
* @return true if the request was sent by the cluster manager; false otherwise
|
||||
* @return true if the request was sent by the cluster manager; false
|
||||
* otherwise
|
||||
*/
|
||||
boolean isRequestSentByClusterManager();
|
||||
|
||||
|
||||
/**
|
||||
* Sets the flag to indicate if a request was sent by the cluster manager.
|
||||
* @param flag true if the request was sent by the cluster manager; false otherwise
|
||||
*
|
||||
* @param flag true if the request was sent by the cluster manager; false
|
||||
* otherwise
|
||||
*/
|
||||
void setRequestSentByClusterManager(boolean flag);
|
||||
|
||||
|
||||
/**
|
||||
* Gets an id generation seed. This is used to ensure that nodes are able to generate the
|
||||
* same id across the cluster. This is usually handled by the cluster manager creating the
|
||||
* id, however for some actions (snippets, templates, etc) this is not possible.
|
||||
* @return
|
||||
* Gets an id generation seed. This is used to ensure that nodes are able to
|
||||
* generate the same id across the cluster. This is usually handled by the
|
||||
* cluster manager creating the id, however for some actions (snippets,
|
||||
* templates, etc) this is not possible.
|
||||
*
|
||||
* @return generated id seed
|
||||
*/
|
||||
String getIdGenerationSeed();
|
||||
}
|
||||
|
|
|
@ -29,13 +29,13 @@ import org.apache.nifi.web.Revision;
|
|||
public class ClusterContextImpl implements ClusterContext, Serializable {
|
||||
|
||||
private final List<Action> actions = new ArrayList<>();
|
||||
|
||||
|
||||
private Revision revision;
|
||||
|
||||
|
||||
private boolean requestSentByClusterManager;
|
||||
|
||||
|
||||
private final String idGenerationSeed = UUID.randomUUID().toString();
|
||||
|
||||
|
||||
@Override
|
||||
public List<Action> getActions() {
|
||||
return actions;
|
||||
|
@ -55,7 +55,7 @@ public class ClusterContextImpl implements ClusterContext, Serializable {
|
|||
public boolean isRequestSentByClusterManager() {
|
||||
return requestSentByClusterManager;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setRequestSentByClusterManager(boolean requestSentByClusterManager) {
|
||||
this.requestSentByClusterManager = requestSentByClusterManager;
|
||||
|
|
|
@ -20,23 +20,23 @@ package org.apache.nifi.cluster.context;
|
|||
* Manages a cluster context on a threadlocal.
|
||||
*/
|
||||
public class ClusterContextThreadLocal {
|
||||
|
||||
|
||||
private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>();
|
||||
|
||||
|
||||
public static void removeContext() {
|
||||
contextHolder.remove();
|
||||
}
|
||||
|
||||
|
||||
public static ClusterContext createEmptyContext() {
|
||||
return new ClusterContextImpl();
|
||||
}
|
||||
|
||||
|
||||
public static ClusterContext getContext() {
|
||||
return contextHolder.get();
|
||||
}
|
||||
|
||||
|
||||
public static void setContext(final ClusterContext context) {
|
||||
contextHolder.set(context);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -27,8 +27,9 @@ public interface ClusterNodeFirewall {
|
|||
* false otherwise.
|
||||
*
|
||||
* If an IP is given, then it must be formatted in dotted decimal notation.
|
||||
* @param hostOrIp
|
||||
* @return
|
||||
*
|
||||
* @param hostOrIp host
|
||||
* @return true if permissible
|
||||
*/
|
||||
boolean isPermissible(String hostOrIp);
|
||||
|
||||
|
|
|
@ -16,10 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.firewall.impl;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import org.apache.commons.net.util.SubnetUtils;
|
||||
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
|
|
|
@ -39,13 +39,13 @@ public class ClusterDataFlow {
|
|||
}
|
||||
|
||||
public byte[] getControllerServices() {
|
||||
return controllerServices;
|
||||
return controllerServices;
|
||||
}
|
||||
|
||||
|
||||
public byte[] getReportingTasks() {
|
||||
return reportingTasks;
|
||||
return reportingTasks;
|
||||
}
|
||||
|
||||
|
||||
public NodeIdentifier getPrimaryNodeId() {
|
||||
return primaryNodeId;
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public interface DataFlowDao {
|
|||
* Saves the cluster's dataflow.
|
||||
*
|
||||
*
|
||||
* @param dataFlow
|
||||
* @param dataFlow flow
|
||||
* @throws DaoException if the dataflow was unable to be saved
|
||||
*/
|
||||
void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
|||
*
|
||||
* Clients must call start() and stop() to initialize and stop the instance.
|
||||
*
|
||||
* @author unattributed
|
||||
*/
|
||||
public interface DataFlowManagementService {
|
||||
|
||||
|
@ -68,21 +67,23 @@ public interface DataFlowManagementService {
|
|||
void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
|
||||
|
||||
/**
|
||||
* Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
|
||||
*
|
||||
* @param serializedControllerServices
|
||||
* @throws DaoException
|
||||
* Updates the dataflow with the given serialized form of the Controller
|
||||
* Services that are to exist on the NCM.
|
||||
*
|
||||
* @param serializedControllerServices services
|
||||
* @throws DaoException ex
|
||||
*/
|
||||
void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
|
||||
|
||||
|
||||
/**
|
||||
* Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
|
||||
*
|
||||
* @param serviceNodes
|
||||
* @throws DaoException
|
||||
* Updates the dataflow with the given serialized form of Reporting Tasks
|
||||
* that are to exist on the NCM.
|
||||
*
|
||||
* @param serializedReportingTasks tasks
|
||||
* @throws DaoException ex
|
||||
*/
|
||||
void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
|
||||
|
||||
|
||||
/**
|
||||
* Sets the state of the flow.
|
||||
*
|
||||
|
|
|
@ -187,36 +187,35 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
throw new DaoException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException {
|
||||
try (final FileInputStream primaryFis = new FileInputStream(primaryFile);
|
||||
final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
|
||||
final FileInputStream restoreFis = new FileInputStream(restoreFile);
|
||||
final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
|
||||
|
||||
final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
|
||||
final FileInputStream restoreFis = new FileInputStream(restoreFile);
|
||||
final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
|
||||
|
||||
final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
|
||||
final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
|
||||
|
||||
if ( primaryEntry == null && restoreEntry == null ) {
|
||||
if (primaryEntry == null && restoreEntry == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) {
|
||||
if ((primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null)) {
|
||||
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
|
||||
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
|
||||
}
|
||||
|
||||
|
||||
final byte[] primaryMd5 = calculateMd5(primaryIn);
|
||||
final byte[] restoreMd5 = calculateMd5(restoreIn);
|
||||
|
||||
if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
|
||||
|
||||
if (!Arrays.equals(primaryMd5, restoreMd5)) {
|
||||
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
|
||||
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private byte[] calculateMd5(final InputStream in) throws IOException {
|
||||
final MessageDigest digest;
|
||||
try {
|
||||
|
@ -224,7 +223,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
} catch (final NoSuchAlgorithmException nsae) {
|
||||
throw new IOException(nsae);
|
||||
}
|
||||
|
||||
|
||||
int len;
|
||||
final byte[] buffer = new byte[8192];
|
||||
while ((len = in.read(buffer)) > -1) {
|
||||
|
@ -257,12 +256,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
if (primaryStateFile == null) {
|
||||
writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
|
||||
} else {
|
||||
throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
|
||||
throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory "
|
||||
+ "'%s' exists, but it does not exist in the restore directory '%s'",
|
||||
primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
|
||||
}
|
||||
} else {
|
||||
if (primaryStateFile == null) {
|
||||
throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
|
||||
throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory "
|
||||
+ "'%s' exists, but it does not exist in the primary directory '%s'",
|
||||
restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
|
||||
} else {
|
||||
final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
|
||||
|
@ -270,14 +271,15 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
if (primaryFlowState == restoreFlowState) {
|
||||
writeDataFlow(restoreStateFile, dataFlow);
|
||||
} else {
|
||||
throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
|
||||
throw new DaoException(String.format("Unable to save dataflow because state file in primary directory "
|
||||
+ "'%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
|
||||
primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// write dataflow to primary
|
||||
// write dataflow to primary
|
||||
if (primaryStateFile == null) {
|
||||
writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
|
||||
} else {
|
||||
|
@ -477,7 +479,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
byte[] clusterInfoBytes = new byte[0];
|
||||
byte[] controllerServiceBytes = new byte[0];
|
||||
byte[] reportingTaskBytes = new byte[0];
|
||||
|
||||
|
||||
try (final InputStream inStream = new FileInputStream(file);
|
||||
final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
|
||||
TarArchiveEntry tarEntry;
|
||||
|
@ -500,13 +502,13 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
|
||||
break;
|
||||
case CONTROLLER_SERVICES_FILENAME:
|
||||
controllerServiceBytes = new byte[(int) tarEntry.getSize()];
|
||||
StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
|
||||
break;
|
||||
controllerServiceBytes = new byte[(int) tarEntry.getSize()];
|
||||
StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
|
||||
break;
|
||||
case REPORTING_TASKS_FILENAME:
|
||||
reportingTaskBytes = new byte[(int) tarEntry.getSize()];
|
||||
StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
|
||||
break;
|
||||
reportingTaskBytes = new byte[(int) tarEntry.getSize()];
|
||||
StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
|
||||
break;
|
||||
default:
|
||||
throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
|
||||
}
|
||||
|
@ -559,7 +561,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
|
|||
final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
|
||||
|
||||
final DataFlow dataFlow = clusterDataFlow.getDataFlow();
|
||||
if ( dataFlow == null ) {
|
||||
if (dataFlow == null) {
|
||||
writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
|
||||
writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
|
||||
writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
|
||||
|
|
|
@ -64,12 +64,11 @@ import org.slf4j.LoggerFactory;
|
|||
public class DataFlowManagementServiceImpl implements DataFlowManagementService {
|
||||
|
||||
/*
|
||||
* Developer Note:
|
||||
*
|
||||
* Developer Note:
|
||||
*
|
||||
* This class maintains an ExecutorService and a Runnable.
|
||||
* Although the class is not externally threadsafe, its internals are protected to
|
||||
* accommodate multithread access between the ExecutorServer and the Runnable.
|
||||
*
|
||||
*/
|
||||
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
|
||||
|
||||
|
@ -170,13 +169,12 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
|
|||
resourceLock.unlock("updatePrimaryNode");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
|
||||
resourceLock.lock();
|
||||
try {
|
||||
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
|
||||
resourceLock.lock();
|
||||
try {
|
||||
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
|
||||
|
||||
final StandardDataFlow dataFlow;
|
||||
final byte[] reportingTaskBytes;
|
||||
|
@ -192,16 +190,16 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
|
|||
}
|
||||
|
||||
flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
|
||||
} finally {
|
||||
resourceLock.unlock("updateControllerServices");
|
||||
}
|
||||
} finally {
|
||||
resourceLock.unlock("updateControllerServices");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
|
||||
resourceLock.lock();
|
||||
try {
|
||||
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
|
||||
resourceLock.lock();
|
||||
try {
|
||||
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
|
||||
|
||||
final StandardDataFlow dataFlow;
|
||||
final byte[] controllerServiceBytes;
|
||||
|
@ -217,9 +215,9 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
|
|||
}
|
||||
|
||||
flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
|
||||
} finally {
|
||||
resourceLock.unlock("updateControllerServices");
|
||||
}
|
||||
} finally {
|
||||
resourceLock.unlock("updateControllerServices");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -361,8 +359,8 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
|
|||
if (existingClusterDataFlow == null) {
|
||||
currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
|
||||
} else {
|
||||
currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
|
||||
existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
|
||||
currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
|
||||
existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
|
||||
}
|
||||
flowDao.saveDataFlow(currentClusterDataFlow);
|
||||
flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
|
||||
|
|
|
@ -77,14 +77,14 @@ public interface ClusterManager extends NodeInformant {
|
|||
Set<Node> getNodes(Status... statuses);
|
||||
|
||||
/**
|
||||
* @param nodeId
|
||||
* @param nodeId node identifier
|
||||
* @return returns the node with the given identifier or null if node does
|
||||
* not exist
|
||||
*/
|
||||
Node getNode(String nodeId);
|
||||
|
||||
/**
|
||||
* @param statuses
|
||||
* @param statuses statuses
|
||||
* @return the set of node identifiers with the given node status
|
||||
*/
|
||||
Set<NodeIdentifier> getNodeIds(Status... statuses);
|
||||
|
@ -199,9 +199,7 @@ public interface ClusterManager extends NodeInformant {
|
|||
Node getPrimaryNode();
|
||||
|
||||
/**
|
||||
* Returns the bulletin repository.
|
||||
*
|
||||
* @return
|
||||
* @return the bulletin repository
|
||||
*/
|
||||
BulletinRepository getBulletinRepository();
|
||||
|
||||
|
|
|
@ -192,19 +192,19 @@ public class NodeResponse {
|
|||
}
|
||||
|
||||
/**
|
||||
* If this node response has been merged returns the updated entity,
|
||||
* otherwise null. Also returns null if hasThrowable() is true. The
|
||||
* intent of this method is to support getting the response entity
|
||||
* when it was already consumed during the merge operation. In this
|
||||
* case the client response rom getClientResponse() will not support
|
||||
* a getEntity(...) or getEntityInputStream() call.
|
||||
*
|
||||
* @return
|
||||
* If this node response has been merged returns the updated entity,
|
||||
* otherwise null. Also returns null if hasThrowable() is true. The intent
|
||||
* of this method is to support getting the response entity when it was
|
||||
* already consumed during the merge operation. In this case the client
|
||||
* response rom getClientResponse() will not support a getEntity(...) or
|
||||
* getEntityInputStream() call.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Entity getUpdatedEntity() {
|
||||
return updatedEntity;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a Response by mapping the ClientResponse values to it. Since the
|
||||
* ClientResponse's input stream can only be read once, this method should
|
||||
|
|
|
@ -318,13 +318,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
|
||||
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
|
||||
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
|
||||
|
||||
|
||||
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
|
||||
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
|
||||
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
|
||||
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
|
||||
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
|
||||
|
||||
|
||||
private final NiFiProperties properties;
|
||||
private final HttpRequestReplicator httpRequestReplicator;
|
||||
private final HttpResponseMapper httpResponseMapper;
|
||||
|
@ -427,14 +427,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
public void heartbeat() {
|
||||
}
|
||||
}, this, encryptor);
|
||||
|
||||
|
||||
// When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
|
||||
// going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
|
||||
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
|
||||
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
|
||||
|
||||
|
||||
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
|
||||
}
|
||||
|
||||
|
@ -479,10 +479,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
|
||||
final byte[] serializedServices = clusterDataFlow.getControllerServices();
|
||||
if ( serializedServices != null && serializedServices.length > 0 ) {
|
||||
ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
|
||||
if (serializedServices != null && serializedServices.length > 0) {
|
||||
ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
|
||||
}
|
||||
|
||||
|
||||
// start multicast broadcasting service, if configured
|
||||
if (servicesBroadcaster != null) {
|
||||
servicesBroadcaster.start();
|
||||
|
@ -493,8 +493,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
|
||||
// Load and start running Reporting Tasks
|
||||
final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
|
||||
if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) {
|
||||
loadReportingTasks(serializedReportingTasks);
|
||||
if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
|
||||
loadReportingTasks(serializedReportingTasks);
|
||||
}
|
||||
} catch (final IOException ioe) {
|
||||
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
|
||||
|
@ -558,10 +558,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
servicesBroadcaster.stop();
|
||||
}
|
||||
|
||||
if ( processScheduler != null ) {
|
||||
if (processScheduler != null) {
|
||||
processScheduler.shutdown();
|
||||
}
|
||||
|
||||
|
||||
if (encounteredException) {
|
||||
throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details.");
|
||||
}
|
||||
|
@ -946,7 +946,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
|
||||
final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim();
|
||||
final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue);
|
||||
|
||||
|
||||
// Reporting Task Properties
|
||||
for (final Element property : DomUtils.getChildElementsByTagName(taskElement, "property")) {
|
||||
final String name = DomUtils.getChildText(property, "name");
|
||||
|
@ -969,21 +969,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
|
||||
|
||||
final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask);
|
||||
final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
|
||||
final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
|
||||
schedulingStrategy, taskSchedulingPeriod, componentLog, this);
|
||||
reportingTask.initialize(config);
|
||||
|
||||
final String annotationData = DomUtils.getChildText(taskElement, "annotationData");
|
||||
if ( annotationData != null ) {
|
||||
if (annotationData != null) {
|
||||
reportingTaskNode.setAnnotationData(annotationData.trim());
|
||||
}
|
||||
|
||||
|
||||
final Map<PropertyDescriptor, String> resolvedProps;
|
||||
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
||||
resolvedProps = new HashMap<>();
|
||||
for (final Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
|
||||
if ( entry.getValue() == null ) {
|
||||
if (entry.getValue() == null) {
|
||||
resolvedProps.put(descriptor, descriptor.getDefaultValue());
|
||||
} else {
|
||||
resolvedProps.put(descriptor, entry.getValue());
|
||||
|
@ -992,24 +992,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
|
||||
if ( entry.getValue() != null ) {
|
||||
if (entry.getValue() != null) {
|
||||
reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
final String comments = DomUtils.getChildText(taskElement, "comment");
|
||||
if ( comments != null ) {
|
||||
if (comments != null) {
|
||||
reportingTaskNode.setComments(comments);
|
||||
}
|
||||
|
||||
reportingTaskNode.setScheduledState(scheduledState);
|
||||
if ( ScheduledState.RUNNING.equals(scheduledState) ) {
|
||||
if ( reportingTaskNode.isValid() ) {
|
||||
if (ScheduledState.RUNNING.equals(scheduledState)) {
|
||||
if (reportingTaskNode.isValid()) {
|
||||
try {
|
||||
processScheduler.schedule(reportingTaskNode);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to start {} due to {}", reportingTaskNode, e);
|
||||
if ( logger.isDebugEnabled() ) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
}
|
||||
|
@ -1017,8 +1017,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
|
||||
}
|
||||
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
|
||||
|
@ -1031,7 +1030,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
return tasks;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
|
||||
if (type == null) {
|
||||
|
@ -1064,16 +1062,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
|
||||
new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
|
||||
taskNode.setName(task.getClass().getSimpleName());
|
||||
|
||||
|
||||
reportingTasks.put(id, taskNode);
|
||||
if ( firstTimeAdded ) {
|
||||
if (firstTimeAdded) {
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
|
||||
} catch (final Exception e) {
|
||||
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return taskNode;
|
||||
}
|
||||
|
||||
|
@ -1372,7 +1370,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
writeLock.unlock("handleControllerStartupFailure");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds an instance of a specified controller service.
|
||||
*
|
||||
|
@ -1383,7 +1381,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
*/
|
||||
@Override
|
||||
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
|
||||
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
|
||||
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1410,82 +1408,80 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
|
||||
return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getControllerServiceName(final String serviceIdentifier) {
|
||||
return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
|
||||
return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeControllerService(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.removeControllerService(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.enableControllerService(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
|
||||
controllerServiceProvider.enableControllerServices(serviceNodes);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void disableControllerService(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.disableControllerService(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Set<ControllerServiceNode> getAllControllerServices() {
|
||||
return controllerServiceProvider.getAllControllerServices();
|
||||
return controllerServiceProvider.getAllControllerServices();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void disableReferencingServices(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.disableReferencingServices(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void enableReferencingServices(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.enableReferencingServices(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
|
||||
controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
|
||||
}
|
||||
|
||||
|
||||
private byte[] serialize(final Document doc) throws TransformerException {
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
final DOMSource domSource = new DOMSource(doc);
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
final DOMSource domSource = new DOMSource(doc);
|
||||
final StreamResult streamResult = new StreamResult(baos);
|
||||
|
||||
// configure the transformer and convert the DOM
|
||||
|
@ -1498,91 +1494,89 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
transformer.transform(domSource, streamResult);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
|
||||
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
|
||||
final Document document = docBuilder.newDocument();
|
||||
final Element rootElement = document.createElement("controllerServices");
|
||||
document.appendChild(rootElement);
|
||||
|
||||
for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
|
||||
StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
|
||||
}
|
||||
|
||||
return serialize(document);
|
||||
final Element rootElement = document.createElement("controllerServices");
|
||||
document.appendChild(rootElement);
|
||||
|
||||
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||
StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
|
||||
}
|
||||
|
||||
return serialize(document);
|
||||
}
|
||||
|
||||
|
||||
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
|
||||
final Document document = docBuilder.newDocument();
|
||||
final Element rootElement = document.createElement("reportingTasks");
|
||||
document.appendChild(rootElement);
|
||||
|
||||
for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
|
||||
StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
|
||||
}
|
||||
|
||||
return serialize(document);
|
||||
final Element rootElement = document.createElement("reportingTasks");
|
||||
document.appendChild(rootElement);
|
||||
|
||||
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
|
||||
StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
|
||||
}
|
||||
|
||||
return serialize(document);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void saveControllerServices() {
|
||||
try {
|
||||
dataFlowManagementService.updateControllerServices(serializeControllerServices());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
|
||||
if ( logger.isDebugEnabled() ) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
|
||||
"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
|
||||
}
|
||||
try {
|
||||
dataFlowManagementService.updateControllerServices(serializeControllerServices());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
|
||||
"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void saveReportingTasks() {
|
||||
try {
|
||||
dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
|
||||
if ( logger.isDebugEnabled() ) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
|
||||
"Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
|
||||
}
|
||||
try {
|
||||
dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
|
||||
"Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ReportingTaskNode> getAllReportingTasks() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return new HashSet<>(reportingTasks.values());
|
||||
} finally {
|
||||
readLock.unlock("getReportingTasks");
|
||||
}
|
||||
readLock.lock();
|
||||
try {
|
||||
return new HashSet<>(reportingTasks.values());
|
||||
} finally {
|
||||
readLock.unlock("getReportingTasks");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportingTaskNode getReportingTaskNode(final String taskId) {
|
||||
readLock.lock();
|
||||
try {
|
||||
return reportingTasks.get(taskId);
|
||||
} finally {
|
||||
readLock.unlock("getReportingTaskNode");
|
||||
}
|
||||
readLock.lock();
|
||||
try {
|
||||
return reportingTasks.get(taskId);
|
||||
} finally {
|
||||
readLock.unlock("getReportingTaskNode");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
|
||||
reportingTaskNode.verifyCanStart();
|
||||
processScheduler.schedule(reportingTaskNode);
|
||||
processScheduler.schedule(reportingTaskNode);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
|
||||
reportingTaskNode.verifyCanStop();
|
||||
|
@ -1591,52 +1585,50 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
|
||||
@Override
|
||||
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
|
||||
if ( existing == null || existing != reportingTaskNode ) {
|
||||
throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
|
||||
}
|
||||
|
||||
reportingTaskNode.verifyCanDelete();
|
||||
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
|
||||
}
|
||||
|
||||
for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
if (descriptor.getControllerServiceDefinition() != null ) {
|
||||
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
|
||||
if ( value != null ) {
|
||||
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
|
||||
if ( serviceNode != null ) {
|
||||
serviceNode.removeReference(reportingTaskNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
reportingTasks.remove(reportingTaskNode.getIdentifier());
|
||||
} finally {
|
||||
writeLock.unlock("removeReportingTask");
|
||||
}
|
||||
writeLock.lock();
|
||||
try {
|
||||
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
|
||||
if (existing == null || existing != reportingTaskNode) {
|
||||
throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
|
||||
}
|
||||
|
||||
reportingTaskNode.verifyCanDelete();
|
||||
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
|
||||
}
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
if (descriptor.getControllerServiceDefinition() != null) {
|
||||
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
|
||||
if (value != null) {
|
||||
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
|
||||
if (serviceNode != null) {
|
||||
serviceNode.removeReference(reportingTaskNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
reportingTasks.remove(reportingTaskNode.getIdentifier());
|
||||
} finally {
|
||||
writeLock.unlock("removeReportingTask");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void disableReportingTask(final ReportingTaskNode reportingTask) {
|
||||
reportingTask.verifyCanDisable();
|
||||
processScheduler.disableReportingTask(reportingTask);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void enableReportingTask(final ReportingTaskNode reportingTask) {
|
||||
reportingTask.verifyCanEnable();
|
||||
processScheduler.enableReportingTask(reportingTask);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Handle a bulletins message.
|
||||
*
|
||||
|
@ -2336,7 +2328,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
// merge the response
|
||||
final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
|
||||
holder.set(clientResponse);
|
||||
|
||||
|
||||
// if we have a response get the updated cluster context for auditing and revision updating
|
||||
Revision updatedRevision = null;
|
||||
if (mutableRequest && clientResponse != null) {
|
||||
|
@ -2367,18 +2359,18 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return updatedRevision;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// federate the request and lock on the revision
|
||||
if (mutableRequest) {
|
||||
optimisticLockingManager.setRevision(federateRequest);
|
||||
} else {
|
||||
federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
|
||||
}
|
||||
|
||||
|
||||
return holder.get();
|
||||
}
|
||||
|
||||
|
@ -2387,7 +2379,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
|
||||
private static boolean isProcessorEndpoint(final URI uri, final String method) {
|
||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) ) {
|
||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
|
||||
return true;
|
||||
} else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
||||
return true;
|
||||
|
@ -2434,11 +2426,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
|
||||
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
|
||||
}
|
||||
|
||||
|
||||
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
|
||||
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
|
||||
}
|
||||
|
||||
|
||||
private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
|
||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
||||
return true;
|
||||
|
@ -2448,19 +2440,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
|
||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
|
||||
return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
|
||||
}
|
||||
|
||||
|
||||
private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
|
||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
||||
return true;
|
||||
|
@ -2661,7 +2653,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
|
||||
final Map<String, Integer> activeThreadCounts = new HashMap<>();
|
||||
final Map<String, String> states = new HashMap<>();
|
||||
|
@ -2669,7 +2661,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
|
||||
|
||||
// go through all the nodes referencing components
|
||||
if ( nodeReferencingComponents != null ) {
|
||||
if (nodeReferencingComponents != null) {
|
||||
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
|
||||
// handle active thread counts
|
||||
if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
|
||||
|
@ -2680,7 +2672,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// handle controller service state
|
||||
final String state = states.get(nodeReferencingComponent.getId());
|
||||
if (state == null) {
|
||||
|
@ -2692,7 +2684,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// go through each referencing components
|
||||
for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
|
||||
|
@ -2700,24 +2692,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
if (activeThreadCount != null) {
|
||||
referencingComponent.setActiveThreadCount(activeThreadCount);
|
||||
}
|
||||
|
||||
|
||||
final String state = states.get(referencingComponent.getId());
|
||||
if (state != null) {
|
||||
referencingComponent.setState(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
|
||||
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
|
||||
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
|
||||
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
|
||||
|
||||
|
||||
String state = null;
|
||||
for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
|
||||
final NodeIdentifier nodeId = nodeEntry.getKey();
|
||||
final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
|
||||
|
||||
|
||||
if (state == null) {
|
||||
if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
|
||||
state = ControllerServiceState.DISABLING.name();
|
||||
|
@ -2725,27 +2717,27 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
state = ControllerServiceState.ENABLING.name();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
|
||||
nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
|
||||
}
|
||||
|
||||
|
||||
// merge the validation errors
|
||||
mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
|
||||
}
|
||||
|
||||
|
||||
// merge the referencing components
|
||||
mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
|
||||
|
||||
|
||||
// store the 'transition' state is applicable
|
||||
if (state != null) {
|
||||
controllerService.setState(state);
|
||||
}
|
||||
|
||||
|
||||
// set the merged the validation errors
|
||||
controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
|
||||
}
|
||||
|
||||
|
||||
private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
|
||||
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
|
||||
|
||||
|
@ -2757,24 +2749,25 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
if (nodeReportingTask.getActiveThreadCount() != null) {
|
||||
activeThreadCount += nodeReportingTask.getActiveThreadCount();
|
||||
}
|
||||
|
||||
|
||||
// merge the validation errors
|
||||
mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
|
||||
}
|
||||
|
||||
// set the merged active thread counts
|
||||
reportingTask.setActiveThreadCount(activeThreadCount);
|
||||
|
||||
|
||||
// set the merged the validation errors
|
||||
reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the validation errors into the specified map, recording the corresponding node identifier.
|
||||
*
|
||||
* Merges the validation errors into the specified map, recording the
|
||||
* corresponding node identifier.
|
||||
*
|
||||
* @param validationErrorMap
|
||||
* @param nodeId
|
||||
* @param nodeValidationErrors
|
||||
* @param nodeValidationErrors
|
||||
*/
|
||||
public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
|
||||
if (nodeValidationErrors != null) {
|
||||
|
@ -2788,13 +2781,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
|
||||
*
|
||||
* Normalizes the validation errors by prepending the corresponding nodes
|
||||
* when the error does not exist across all nodes.
|
||||
*
|
||||
* @param validationErrorMap
|
||||
* @param totalNodes
|
||||
* @return
|
||||
* @return
|
||||
*/
|
||||
public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
|
||||
final Set<String> normalizedValidationErrors = new HashSet<>();
|
||||
|
@ -2812,7 +2806,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
return normalizedValidationErrors;
|
||||
}
|
||||
|
||||
|
||||
// requires write lock to be already acquired unless request is not mutable
|
||||
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
|
||||
// holds the one response of all the node responses to return to the client
|
||||
|
@ -3105,7 +3099,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
} else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
|
||||
final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
|
||||
final ControllerServiceDTO controllerService = responseEntity.getControllerService();
|
||||
|
||||
|
||||
final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
|
||||
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||
|
@ -3118,12 +3112,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
|
||||
}
|
||||
mergeControllerService(controllerService, resultsMap);
|
||||
|
||||
|
||||
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
||||
} else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
|
||||
final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
|
||||
final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
|
||||
|
||||
|
||||
final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
|
||||
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||
|
@ -3156,7 +3150,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
} else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
|
||||
final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
|
||||
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
|
||||
|
||||
|
||||
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
|
||||
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||
|
@ -3169,12 +3163,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
|
||||
}
|
||||
mergeControllerServiceReferences(referencingComponents, resultsMap);
|
||||
|
||||
|
||||
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
||||
} else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
|
||||
final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
|
||||
final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
|
||||
|
||||
|
||||
final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
|
||||
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||
|
@ -3187,12 +3181,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
|
||||
}
|
||||
mergeReportingTask(reportingTask, resultsMap);
|
||||
|
||||
|
||||
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
||||
} else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
|
||||
final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
|
||||
final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
|
||||
|
||||
|
||||
final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
|
||||
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||
|
|
|
@ -161,7 +161,7 @@ public class Node implements Cloneable, Comparable<Node> {
|
|||
*
|
||||
* This method is thread-safe and may be called without obtaining any lock.
|
||||
*
|
||||
* @param connectionRequestedTimestamp
|
||||
* @param connectionRequestedTimestamp timestamp
|
||||
*/
|
||||
public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
|
||||
this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
|
||||
|
|
|
@ -51,7 +51,7 @@ public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryB
|
|||
@Override
|
||||
public Object getObject() throws Exception {
|
||||
/*
|
||||
* If configured for the cluster manager, then the service locator is never used.
|
||||
* If configured for the cluster manager, then the service locator is never used.
|
||||
*/
|
||||
if (properties.isClusterManager()) {
|
||||
return null;
|
||||
|
|
|
@ -49,7 +49,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
|
|||
private NiFiProperties properties;
|
||||
|
||||
private StringEncryptor encryptor;
|
||||
|
||||
|
||||
private OptimisticLockingManager optimisticLockingManager;
|
||||
|
||||
@Override
|
||||
|
@ -58,8 +58,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
|
|||
throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
|
||||
} else if (!properties.isClusterManager()) {
|
||||
/*
|
||||
* If not configured for the cluster manager, then the cluster manager is never used.
|
||||
* null is returned so that we don't instantiate a thread pool or other resources.
|
||||
* If not configured for the cluster manager, then the cluster manager is never used.
|
||||
* null is returned so that we don't instantiate a thread pool or other resources.
|
||||
*/
|
||||
return null;
|
||||
} else if (clusterManager == null) {
|
||||
|
@ -127,7 +127,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
|
|||
public void setEncryptor(final StringEncryptor encryptor) {
|
||||
this.encryptor = encryptor;
|
||||
}
|
||||
|
||||
|
||||
public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
|
||||
this.optimisticLockingManager = optimisticLockingManager;
|
||||
}
|
||||
|
|
|
@ -20,14 +20,15 @@ package org.apache.nifi.web;
|
|||
* Represents a request to configure. The implementations execute method will
|
||||
* perform the configuration action. It will return type T which will be
|
||||
* encapsulated in a ConfigurationSnapshot.
|
||||
*
|
||||
* @param <T>
|
||||
*
|
||||
* @param <T> type of request
|
||||
*/
|
||||
public interface ConfigurationRequest<T> {
|
||||
|
||||
/**
|
||||
* Executes a configuration action and returns the updated resulting configuration.
|
||||
*
|
||||
* Executes a configuration action and returns the updated resulting
|
||||
* configuration.
|
||||
*
|
||||
* @return The resulting configuration
|
||||
*/
|
||||
T execute();
|
||||
|
|
|
@ -18,7 +18,8 @@ package org.apache.nifi.web;
|
|||
|
||||
/**
|
||||
* Response object that captures some configuration for a given revision.
|
||||
* @param <T>
|
||||
*
|
||||
* @param <T> type of snapshot
|
||||
*/
|
||||
public class ConfigurationSnapshot<T> {
|
||||
|
||||
|
|
|
@ -27,9 +27,9 @@ public class FlowModification {
|
|||
|
||||
/**
|
||||
* Creates a new FlowModification.
|
||||
*
|
||||
* @param revision
|
||||
* @param lastModifier
|
||||
*
|
||||
* @param revision revision
|
||||
* @param lastModifier modifier
|
||||
*/
|
||||
public FlowModification(Revision revision, String lastModifier) {
|
||||
this.revision = revision;
|
||||
|
@ -38,8 +38,8 @@ public class FlowModification {
|
|||
|
||||
/**
|
||||
* Get the revision.
|
||||
*
|
||||
* @return
|
||||
*
|
||||
* @return the revision
|
||||
*/
|
||||
public Revision getRevision() {
|
||||
return revision;
|
||||
|
@ -47,11 +47,11 @@ public class FlowModification {
|
|||
|
||||
/**
|
||||
* Get the last modifier.
|
||||
*
|
||||
* @return
|
||||
*
|
||||
* @return the modifier
|
||||
*/
|
||||
public String getLastModifier() {
|
||||
return lastModifier;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -26,26 +26,27 @@ package org.apache.nifi.web;
|
|||
public interface OptimisticLockingManager {
|
||||
|
||||
/**
|
||||
* Attempts to execute the specified configuration request using the specified revision within a lock.
|
||||
*
|
||||
* @param <T>
|
||||
* @param revision
|
||||
* @param configurationRequest
|
||||
* @return
|
||||
* Attempts to execute the specified configuration request using the
|
||||
* specified revision within a lock.
|
||||
*
|
||||
* @param <T> type of snapshot
|
||||
* @param revision revision
|
||||
* @param configurationRequest request
|
||||
* @return snapshot
|
||||
*/
|
||||
<T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest);
|
||||
|
||||
|
||||
/**
|
||||
* Updates the revision using the specified revision within a lock.
|
||||
*
|
||||
* @param updateRevision
|
||||
*
|
||||
* @param updateRevision new revision
|
||||
*/
|
||||
void setRevision(UpdateRevision updateRevision);
|
||||
|
||||
/**
|
||||
* Returns the last flow modification. This is a combination of the revision and the user
|
||||
* who performed the modification.
|
||||
*
|
||||
* Returns the last flow modification. This is a combination of the revision
|
||||
* and the user who performed the modification.
|
||||
*
|
||||
* @return the last modification
|
||||
*/
|
||||
FlowModification getLastModification();
|
||||
|
|
|
@ -32,31 +32,31 @@ import org.slf4j.LoggerFactory;
|
|||
public class StandardOptimisticLockingManager implements OptimisticLockingManager {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardOptimisticLockingManager.class);
|
||||
|
||||
|
||||
private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
|
||||
private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
|
||||
|
||||
|
||||
private Revision currentRevision = new Revision(0L, "");
|
||||
private String lastModifier = "unknown";
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
|
||||
private void lock() {
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
|
||||
private void unlock() {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
private void checkRevision(final Revision revision) {
|
||||
final FlowModification lastMod = getLastModification();
|
||||
|
||||
|
||||
// with lock, verify revision
|
||||
boolean approved = lastMod.getRevision().equals(revision);
|
||||
|
||||
if (!approved) {
|
||||
logger.debug("Revision check failed because current revision is " + lastMod.getRevision() + " but supplied revision is " + revision);
|
||||
|
||||
|
||||
if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) {
|
||||
throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, lastMod.getRevision()));
|
||||
} else {
|
||||
|
@ -64,11 +64,11 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Revision updateRevision(final Revision updatedRevision) {
|
||||
// record the current modification
|
||||
setLastModification(new FlowModification(updatedRevision, NiFiUserUtils.getNiFiUserName()));
|
||||
|
||||
|
||||
// return the revision
|
||||
return updatedRevision;
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
lock();
|
||||
try {
|
||||
final Revision updatedRevision = updateRevision.execute(getLastModification().getRevision());
|
||||
|
||||
|
||||
// update the revision
|
||||
if (updatedRevision != null) {
|
||||
updateRevision(updatedRevision);
|
||||
|
@ -107,7 +107,7 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public FlowModification getLastModification() {
|
||||
lock();
|
||||
|
@ -119,19 +119,19 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
} else {
|
||||
revision = ctx.getRevision();
|
||||
}
|
||||
|
||||
|
||||
return new FlowModification(revision, lastModifier);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void setLastModification(final FlowModification lastModification) {
|
||||
lock();
|
||||
try {
|
||||
// record the last modifier
|
||||
lastModifier = lastModification.getLastModifier();
|
||||
|
||||
|
||||
// record the updated revision in the cluster context if possible
|
||||
final ClusterContext ctx = ClusterContextThreadLocal.getContext();
|
||||
if (ctx != null) {
|
||||
|
@ -143,10 +143,10 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Revision incrementRevision(String clientId) {
|
||||
final Revision current = getLastModification().getRevision();
|
||||
|
||||
|
||||
final long incrementedVersion;
|
||||
if (current.getVersion() == null) {
|
||||
incrementedVersion = 0;
|
||||
|
@ -155,5 +155,5 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
|
|||
}
|
||||
return new Revision(incrementedVersion, clientId);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -23,9 +23,9 @@ public interface UpdateRevision {
|
|||
|
||||
/**
|
||||
* Executes the action that will result in an updated revision
|
||||
*
|
||||
* @param currentRevision The current revision
|
||||
* @return The updated revision
|
||||
*
|
||||
* @param currentRevision The current revision
|
||||
* @return The updated revision
|
||||
*/
|
||||
Revision execute(Revision currentRevision);
|
||||
}
|
||||
|
|
|
@ -34,10 +34,8 @@ public class DnUtils {
|
|||
private static final Pattern proxyChainPattern = Pattern.compile("<(.*?)>");
|
||||
|
||||
/**
|
||||
* Gets the X-ProxiedEntitiesChain from the specified request.
|
||||
*
|
||||
* @param request
|
||||
* @return
|
||||
* @param request http request
|
||||
* @return the X-ProxiedEntitiesChain from the specified request
|
||||
*/
|
||||
public static String getXProxiedEntitiesChain(final HttpServletRequest request) {
|
||||
String xProxiedEntitiesChain = request.getHeader("X-ProxiedEntitiesChain");
|
||||
|
@ -60,8 +58,8 @@ public class DnUtils {
|
|||
* Formats the specified DN to be set as a HTTP header using well known
|
||||
* conventions.
|
||||
*
|
||||
* @param dn
|
||||
* @return
|
||||
* @param dn raw dn
|
||||
* @return the dn formatted as an HTTP header
|
||||
*/
|
||||
public static String formatProxyDn(String dn) {
|
||||
return "<" + dn + ">";
|
||||
|
@ -70,8 +68,8 @@ public class DnUtils {
|
|||
/**
|
||||
* Tokenizes the specified proxy chain.
|
||||
*
|
||||
* @param rawProxyChain
|
||||
* @return
|
||||
* @param rawProxyChain raw chain
|
||||
* @return tokenized proxy chain
|
||||
*/
|
||||
public static Deque<String> tokenizeProxyChain(String rawProxyChain) {
|
||||
final Deque<String> dnList = new ArrayDeque<>();
|
||||
|
|
|
@ -76,8 +76,8 @@ public class NiFiAnonymousUserFilter extends AnonymousAuthenticationFilter {
|
|||
/**
|
||||
* Only supports anonymous users for non-secure requests or one way ssl.
|
||||
*
|
||||
* @param request
|
||||
* @return
|
||||
* @param request request
|
||||
* @return true if allowed
|
||||
*/
|
||||
@Override
|
||||
protected boolean applyAnonymousForThisRequest(HttpServletRequest request) {
|
||||
|
|
|
@ -38,11 +38,12 @@ public class NiFiAuthenticationEntryPoint implements AuthenticationEntryPoint {
|
|||
|
||||
/**
|
||||
* Always returns a 403 error code to the client.
|
||||
* @param request
|
||||
* @param response
|
||||
* @param ae
|
||||
* @throws java.io.IOException
|
||||
* @throws javax.servlet.ServletException
|
||||
*
|
||||
* @param request request
|
||||
* @param response response
|
||||
* @param ae ae
|
||||
* @throws java.io.IOException ex
|
||||
* @throws javax.servlet.ServletException ex
|
||||
*/
|
||||
@Override
|
||||
public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException ae) throws IOException, ServletException {
|
||||
|
|
|
@ -51,17 +51,17 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
private NiFiProperties properties;
|
||||
|
||||
/**
|
||||
* Loads the user details for the specified dn.
|
||||
*
|
||||
* Synchronizing because we want each request to be authorized atomically since
|
||||
* each may contain any number of DNs. We wanted an access decision made
|
||||
* for each individual request as a whole (without other request potentially
|
||||
* impacting it).
|
||||
* Loads the user details for the specified dn.
|
||||
*
|
||||
* @param rawProxyChain
|
||||
* @return
|
||||
* @throws UsernameNotFoundException
|
||||
* @throws org.springframework.dao.DataAccessException
|
||||
* Synchronizing because we want each request to be authorized atomically
|
||||
* since each may contain any number of DNs. We wanted an access decision
|
||||
* made for each individual request as a whole (without other request
|
||||
* potentially impacting it).
|
||||
*
|
||||
* @param rawProxyChain proxy chain
|
||||
* @return user details
|
||||
* @throws UsernameNotFoundException ex
|
||||
* @throws org.springframework.dao.DataAccessException ex
|
||||
*/
|
||||
@Override
|
||||
public synchronized UserDetails loadUserByUsername(String rawProxyChain) throws UsernameNotFoundException, DataAccessException {
|
||||
|
@ -75,7 +75,7 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
}
|
||||
|
||||
NiFiUser proxy = null;
|
||||
|
||||
|
||||
// process each part of the proxy chain
|
||||
for (final Iterator<String> dnIter = dnList.iterator(); dnIter.hasNext();) {
|
||||
final String dn = dnIter.next();
|
||||
|
@ -92,12 +92,12 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
logger.warn(String.format("Proxy '%s' must have '%s' authority. Current authorities: %s", dn, Authority.ROLE_PROXY.toString(), StringUtils.join(user.getAuthorities(), ", ")));
|
||||
throw new UntrustedProxyException(String.format("Untrusted proxy '%s' must be authorized with '%s'.", dn, Authority.ROLE_PROXY.toString()));
|
||||
}
|
||||
|
||||
|
||||
// if we've already encountered a proxy, update the chain
|
||||
if (proxy != null) {
|
||||
user.setChain(proxy);
|
||||
}
|
||||
|
||||
|
||||
// record this user as the proxy for the next user in the chain
|
||||
proxy = user;
|
||||
} catch (UsernameNotFoundException unfe) {
|
||||
|
@ -118,7 +118,8 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
// attempting to auto create the user account request
|
||||
final String message = String.format("Account request was already submitted for '%s'", dn);
|
||||
logger.warn(message);
|
||||
throw new AccountStatusException(message) {};
|
||||
throw new AccountStatusException(message) {
|
||||
};
|
||||
}
|
||||
} else {
|
||||
logger.warn(String.format("Untrusted proxy '%s' must be authorized with '%s' authority: %s", dn, Authority.ROLE_PROXY.toString(), unfe.getMessage()));
|
||||
|
@ -130,7 +131,7 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
}
|
||||
} else {
|
||||
userDetails = getNiFiUserDetails(dn);
|
||||
|
||||
|
||||
// if we've already encountered a proxy, update the chain
|
||||
if (proxy != null) {
|
||||
final NiFiUser user = userDetails.getNiFiUser();
|
||||
|
@ -145,8 +146,8 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
/**
|
||||
* Loads the user details for the specified dn.
|
||||
*
|
||||
* @param dn
|
||||
* @return
|
||||
* @param dn user dn
|
||||
* @return user detail
|
||||
*/
|
||||
private NiFiUserDetails getNiFiUserDetails(String dn) {
|
||||
try {
|
||||
|
@ -155,7 +156,8 @@ public class NiFiAuthorizationService implements UserDetailsService {
|
|||
} catch (AdministrationException ase) {
|
||||
throw new AuthenticationServiceException(String.format("An error occurred while accessing the user credentials for '%s': %s", dn, ase.getMessage()), ase);
|
||||
} catch (AccountDisabledException | AccountPendingException e) {
|
||||
throw new AccountStatusException(e.getMessage(), e) {};
|
||||
throw new AccountStatusException(e.getMessage(), e) {
|
||||
};
|
||||
} catch (AccountNotFoundException anfe) {
|
||||
throw new UsernameNotFoundException(anfe.getMessage());
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class NiFiUserDetails implements UserDetails {
|
|||
/**
|
||||
* Creates a new NiFiUserDetails.
|
||||
*
|
||||
* @param user
|
||||
* @param user user
|
||||
*/
|
||||
public NiFiUserDetails(NiFiUser user) {
|
||||
this.user = user;
|
||||
|
@ -45,7 +45,7 @@ public class NiFiUserDetails implements UserDetails {
|
|||
/**
|
||||
* Get the user for this UserDetails.
|
||||
*
|
||||
* @return
|
||||
* @return user
|
||||
*/
|
||||
public NiFiUser getNiFiUser() {
|
||||
return user;
|
||||
|
@ -54,7 +54,7 @@ public class NiFiUserDetails implements UserDetails {
|
|||
/**
|
||||
* Returns the authorities that this NiFi user has.
|
||||
*
|
||||
* @return
|
||||
* @return authorities
|
||||
*/
|
||||
@Override
|
||||
public Collection<? extends GrantedAuthority> getAuthorities() {
|
||||
|
@ -66,21 +66,11 @@ public class NiFiUserDetails implements UserDetails {
|
|||
return grantedAuthorities;
|
||||
}
|
||||
|
||||
/**
|
||||
* Not used.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the user name.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return user.getDn();
|
||||
|
|
|
@ -35,7 +35,7 @@ public final class NiFiUserUtils {
|
|||
/**
|
||||
* Return the authorities for the current user.
|
||||
*
|
||||
* @return
|
||||
* @return authorities
|
||||
*/
|
||||
public static Set<String> getAuthorities() {
|
||||
Set<GrantedAuthority> grantedAuthorities = new HashSet<>();
|
||||
|
@ -62,7 +62,7 @@ public final class NiFiUserUtils {
|
|||
* Returns the current NiFiUser or null if the current user is not a
|
||||
* NiFiUser.
|
||||
*
|
||||
* @return
|
||||
* @return user
|
||||
*/
|
||||
public static NiFiUser getNiFiUser() {
|
||||
NiFiUser user = null;
|
||||
|
@ -79,7 +79,7 @@ public final class NiFiUserUtils {
|
|||
|
||||
return user;
|
||||
}
|
||||
|
||||
|
||||
public static String getNiFiUserName() {
|
||||
// get the nifi user to extract the username
|
||||
NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
|
|
@ -24,12 +24,6 @@ import org.springframework.security.web.authentication.preauth.x509.X509Principa
|
|||
*/
|
||||
public class SubjectDnX509PrincipalExtractor implements X509PrincipalExtractor {
|
||||
|
||||
/**
|
||||
* Extracts the principal from the specified client certificate.
|
||||
*
|
||||
* @param cert
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public Object extractPrincipal(X509Certificate cert) {
|
||||
return cert.getSubjectDN().getName().trim();
|
||||
|
|
|
@ -58,16 +58,6 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
private NiFiProperties properties;
|
||||
private UserService userService;
|
||||
|
||||
/**
|
||||
* Override doFilter in order to properly handle when users could not be
|
||||
* authenticated.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @param chain
|
||||
* @throws IOException
|
||||
* @throws ServletException
|
||||
*/
|
||||
@Override
|
||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
|
||||
final HttpServletResponse httpResponse = (HttpServletResponse) response;
|
||||
|
@ -194,13 +184,6 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
return certificateExtractor.extractClientCertificate(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the response headers for successful proxied requests.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @param authResult
|
||||
*/
|
||||
@Override
|
||||
protected void successfulAuthentication(HttpServletRequest request, HttpServletResponse response, Authentication authResult) {
|
||||
if (StringUtils.isNotBlank(request.getHeader(PROXY_ENTITIES_CHAIN))) {
|
||||
|
@ -209,13 +192,6 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
super.successfulAuthentication(request, response, authResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the response headers for unsuccessful proxied requests.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @param failed
|
||||
*/
|
||||
@Override
|
||||
protected void unsuccessfulAuthentication(HttpServletRequest request, HttpServletResponse response, AuthenticationException failed) {
|
||||
if (StringUtils.isNotBlank(request.getHeader(PROXY_ENTITIES_CHAIN))) {
|
||||
|
@ -228,8 +204,8 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
* Determines if the specified request is attempting to register a new user
|
||||
* account.
|
||||
*
|
||||
* @param request
|
||||
* @return
|
||||
* @param request http request
|
||||
* @return true if new user
|
||||
*/
|
||||
private boolean isNewAccountRequest(HttpServletRequest request) {
|
||||
if ("POST".equalsIgnoreCase(request.getMethod())) {
|
||||
|
@ -246,10 +222,10 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
/**
|
||||
* Handles requests that were unable to be authorized.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @param ae
|
||||
* @throws IOException
|
||||
* @param request request
|
||||
* @param response response
|
||||
* @param ae ex
|
||||
* @throws IOException ex
|
||||
*/
|
||||
private void handleUnsuccessfulAuthentication(HttpServletRequest request, HttpServletResponse response, AuthenticationException ae) throws IOException {
|
||||
// set the response status
|
||||
|
@ -292,14 +268,6 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles requests that failed because of a user service error.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @param e
|
||||
* @throws IOException
|
||||
*/
|
||||
private void handleUserServiceError(HttpServletRequest request, HttpServletResponse response, int responseCode, String message) throws IOException {
|
||||
// set the response status
|
||||
response.setContentType("text/plain");
|
||||
|
@ -316,9 +284,9 @@ public class X509AuthenticationFilter extends AbstractPreAuthenticatedProcessing
|
|||
/**
|
||||
* Handles requests that failed because they were bad input.
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @throws IOException
|
||||
* @param request request
|
||||
* @param response response
|
||||
* @throws IOException ioe
|
||||
*/
|
||||
private void handleMissingCertificate(HttpServletRequest request, HttpServletResponse response) throws IOException {
|
||||
// set the response status
|
||||
|
|
|
@ -32,8 +32,8 @@ public class X509CertificateExtractor {
|
|||
* Extract the client certificate from the specified HttpServletRequest or
|
||||
* null if none is specified.
|
||||
*
|
||||
* @param request
|
||||
* @return
|
||||
* @param request http request
|
||||
* @return cert
|
||||
*/
|
||||
public X509Certificate extractClientCertificate(HttpServletRequest request) {
|
||||
X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
|
||||
|
|
|
@ -63,9 +63,6 @@ import org.bouncycastle.ocsp.SingleResp;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class OcspCertificateValidator {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(OcspCertificateValidator.class);
|
||||
|
@ -141,8 +138,8 @@ public class OcspCertificateValidator {
|
|||
/**
|
||||
* Loads the ocsp certificate if specified. Null otherwise.
|
||||
*
|
||||
* @param properties
|
||||
* @return
|
||||
* @param properties nifi properties
|
||||
* @return certificate
|
||||
*/
|
||||
private X509Certificate getOcspCertificate(final NiFiProperties properties) {
|
||||
X509Certificate validationAuthorityCertificate = null;
|
||||
|
@ -164,8 +161,8 @@ public class OcspCertificateValidator {
|
|||
* Loads the trusted certificate authorities according to the specified
|
||||
* properties.
|
||||
*
|
||||
* @param properties
|
||||
* @return
|
||||
* @param properties properties
|
||||
* @return map of certificate authorities
|
||||
*/
|
||||
private Map<String, X509Certificate> getTrustedCAs(final NiFiProperties properties) {
|
||||
final Map<String, X509Certificate> certificateAuthorities = new HashMap<>();
|
||||
|
@ -211,8 +208,8 @@ public class OcspCertificateValidator {
|
|||
/**
|
||||
* Validates the specified certificate using OCSP if configured.
|
||||
*
|
||||
* @param request
|
||||
* @throws CertificateStatusException
|
||||
* @param request http request
|
||||
* @throws CertificateStatusException ex
|
||||
*/
|
||||
public void validate(final HttpServletRequest request) throws CertificateStatusException {
|
||||
final X509Certificate[] certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
|
||||
|
@ -235,7 +232,8 @@ public class OcspCertificateValidator {
|
|||
|
||||
// we only disallow when we have a verified response that states the certificate is revoked
|
||||
if (VerificationStatus.Verified.equals(ocspStatus.getVerificationStatus()) && ValidationStatus.Revoked.equals(ocspStatus.getValidationStatus())) {
|
||||
throw new CertificateStatusException(String.format("Client certificate for <%s> is revoked according to the certificate authority.", subjectCertificate.getSubjectX500Principal().getName()));
|
||||
throw new CertificateStatusException(String.format("Client certificate for <%s> is revoked according to the certificate authority.",
|
||||
subjectCertificate.getSubjectX500Principal().getName()));
|
||||
}
|
||||
} catch (final UncheckedExecutionException uee) {
|
||||
logger.warn(String.format("Unable to validate client certificate via OCSP: <%s>", subjectCertificate.getSubjectX500Principal().getName()), uee.getCause());
|
||||
|
@ -246,8 +244,8 @@ public class OcspCertificateValidator {
|
|||
/**
|
||||
* Gets the subject certificate.
|
||||
*
|
||||
* @param certificates
|
||||
* @return
|
||||
* @param certificates certs
|
||||
* @return subject cert
|
||||
*/
|
||||
private X509Certificate getSubjectCertificate(final X509Certificate[] certificates) {
|
||||
return certificates[0];
|
||||
|
@ -256,8 +254,8 @@ public class OcspCertificateValidator {
|
|||
/**
|
||||
* Gets the issuer certificate.
|
||||
*
|
||||
* @param certificates
|
||||
* @return
|
||||
* @param certificates certs
|
||||
* @return issuer cert
|
||||
*/
|
||||
private X509Certificate getIssuerCertificate(final X509Certificate[] certificates) {
|
||||
if (certificates.length > 1) {
|
||||
|
@ -274,9 +272,8 @@ public class OcspCertificateValidator {
|
|||
/**
|
||||
* Gets the OCSP status for the specified subject and issuer certificates.
|
||||
*
|
||||
* @param subjectCertificate
|
||||
* @param issuerCertificate
|
||||
* @return
|
||||
* @param ocspStatusKey status key
|
||||
* @return ocsp status
|
||||
*/
|
||||
private OcspStatus getOcspStatus(final OcspRequest ocspStatusKey) {
|
||||
final X509Certificate subjectCertificate = ocspStatusKey.getSubjectCertificate();
|
||||
|
@ -406,9 +403,9 @@ public class OcspCertificateValidator {
|
|||
* that issued the subject certificate. Other various checks may be required
|
||||
* (this portion is currently not implemented).
|
||||
*
|
||||
* @param responderCertificate
|
||||
* @param issuerCertificate
|
||||
* @return
|
||||
* @param responderCertificate cert
|
||||
* @param issuerCertificate cert
|
||||
* @return cert
|
||||
*/
|
||||
private X509Certificate getTrustedResponderCertificate(final X509Certificate responderCertificate, final X509Certificate issuerCertificate) {
|
||||
// look for the responder's certificate specifically
|
||||
|
@ -425,13 +422,13 @@ public class OcspCertificateValidator {
|
|||
// if (keyUsage == null || !keyUsage.contains(KP_OCSP_SIGNING_OID)) {
|
||||
// return null;
|
||||
// }
|
||||
//
|
||||
//
|
||||
// // ensure the certificate is valid
|
||||
// responderCertificate.checkValidity();
|
||||
//
|
||||
//
|
||||
// // verify the signature
|
||||
// responderCertificate.verify(issuerCertificate.getPublicKey());
|
||||
//
|
||||
//
|
||||
// return responderCertificate;
|
||||
// } catch (final CertificateException | NoSuchAlgorithmException | InvalidKeyException | NoSuchProviderException | SignatureException e) {
|
||||
// return null;
|
||||
|
|
|
@ -37,12 +37,11 @@ import org.springframework.security.authentication.AccountStatusException;
|
|||
import org.springframework.security.authentication.AuthenticationServiceException;
|
||||
import org.springframework.security.core.userdetails.UsernameNotFoundException;
|
||||
|
||||
|
||||
/**
|
||||
* Test case for NiFiAuthorizationService.
|
||||
*/
|
||||
public class NiFiAuthorizationServiceTest {
|
||||
|
||||
|
||||
private static final String USER = "user";
|
||||
private static final String PROXY = "proxy";
|
||||
private static final String PROXY_PROXY = "proxy-proxy";
|
||||
|
@ -51,16 +50,16 @@ public class NiFiAuthorizationServiceTest {
|
|||
private static final String USER_PENDING = "user-pending";
|
||||
private static final String USER_ADMIN_EXCEPTION = "user-admin-exception";
|
||||
private static final String PROXY_NOT_FOUND = "proxy-not-found";
|
||||
|
||||
|
||||
private NiFiAuthorizationService authorizationService;
|
||||
private UserService userService;
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
// mock the web security properties
|
||||
final NiFiProperties properties = Mockito.mock(NiFiProperties.class);
|
||||
Mockito.when(properties.getSupportNewAccountRequests()).thenReturn(Boolean.TRUE);
|
||||
|
||||
|
||||
userService = Mockito.mock(UserService.class);
|
||||
Mockito.doReturn(null).when(userService).createPendingUserAccount(Mockito.anyString(), Mockito.anyString());
|
||||
Mockito.doAnswer(new Answer() {
|
||||
|
@ -68,7 +67,7 @@ public class NiFiAuthorizationServiceTest {
|
|||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
String dn = (String) args[0];
|
||||
|
||||
|
||||
if (null != dn) {
|
||||
switch (dn) {
|
||||
case USER_NOT_FOUND:
|
||||
|
@ -93,97 +92,99 @@ public class NiFiAuthorizationServiceTest {
|
|||
return proxy;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
}).when(userService).checkAuthorization(Mockito.anyString());
|
||||
|
||||
|
||||
// create the authorization service
|
||||
authorizationService = new NiFiAuthorizationService();
|
||||
authorizationService.setProperties(properties);
|
||||
authorizationService.setUserService(userService);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles users invalid dn chain.
|
||||
* Ensures the authorization service correctly handles users invalid dn
|
||||
* chain.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = UntrustedProxyException.class)
|
||||
public void testInvalidDnChain() throws Exception {
|
||||
authorizationService.loadUserByUsername(USER);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles account not found.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = UsernameNotFoundException.class)
|
||||
public void testAccountNotFound() throws Exception {
|
||||
authorizationService.loadUserByUsername(DnUtils.formatProxyDn(USER_NOT_FOUND));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles account disabled.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = AccountStatusException.class)
|
||||
public void testAccountDisabled() throws Exception {
|
||||
authorizationService.loadUserByUsername(DnUtils.formatProxyDn(USER_DISABLED));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles account pending.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = AccountStatusException.class)
|
||||
public void testAccountPending() throws Exception {
|
||||
authorizationService.loadUserByUsername(DnUtils.formatProxyDn(USER_PENDING));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles account administration exception.
|
||||
* Ensures the authorization service correctly handles account
|
||||
* administration exception.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = AuthenticationServiceException.class)
|
||||
public void testAccountAdminException() throws Exception {
|
||||
authorizationService.loadUserByUsername(DnUtils.formatProxyDn(USER_ADMIN_EXCEPTION));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests the case when there is no proxy.
|
||||
*
|
||||
* @throws Exception
|
||||
*
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test
|
||||
public void testNoProxy() throws Exception {
|
||||
final NiFiUserDetails details = (NiFiUserDetails) authorizationService.loadUserByUsername(DnUtils.formatProxyDn(USER));
|
||||
final NiFiUser user = details.getNiFiUser();
|
||||
|
||||
|
||||
Assert.assertEquals(USER, user.getDn());
|
||||
Assert.assertNull(user.getChain());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests the case when the proxy does not have ROLE_PROXY.
|
||||
*
|
||||
* @throws Exception
|
||||
*
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = UntrustedProxyException.class)
|
||||
public void testInvalidProxy() throws Exception {
|
||||
final String dnChain = DnUtils.formatProxyDn(USER) + DnUtils.formatProxyDn(USER);
|
||||
authorizationService.loadUserByUsername(dnChain);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures the authorization service correctly handles proxy not found by attempting
|
||||
* to create an account request for the proxy.
|
||||
* Ensures the authorization service correctly handles proxy not found by
|
||||
* attempting to create an account request for the proxy.
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test(expected = UsernameNotFoundException.class)
|
||||
public void testProxyNotFound() throws Exception {
|
||||
|
@ -194,55 +195,55 @@ public class NiFiAuthorizationServiceTest {
|
|||
Mockito.verify(userService).createPendingUserAccount(Mockito.eq(PROXY_NOT_FOUND), Mockito.anyString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests the case when there is a proxy.
|
||||
*
|
||||
* @throws Exception
|
||||
*
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test
|
||||
public void testProxy() throws Exception {
|
||||
final String dnChain = DnUtils.formatProxyDn(USER) + DnUtils.formatProxyDn(PROXY);
|
||||
final NiFiUserDetails details = (NiFiUserDetails) authorizationService.loadUserByUsername(dnChain);
|
||||
final NiFiUser user = details.getNiFiUser();
|
||||
|
||||
|
||||
// verify the user
|
||||
Assert.assertEquals(USER, user.getDn());
|
||||
Assert.assertNotNull(user.getChain());
|
||||
|
||||
|
||||
// get the proxy
|
||||
final NiFiUser proxy = user.getChain();
|
||||
|
||||
|
||||
// verify the proxy
|
||||
Assert.assertEquals(PROXY, proxy.getDn());
|
||||
Assert.assertNull(proxy.getChain());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests the case when there is are multiple proxies.
|
||||
*
|
||||
* @throws Exception
|
||||
*
|
||||
* @throws Exception ex
|
||||
*/
|
||||
@Test
|
||||
public void testProxyProxy() throws Exception {
|
||||
final String dnChain = DnUtils.formatProxyDn(USER) + DnUtils.formatProxyDn(PROXY) + DnUtils.formatProxyDn(PROXY_PROXY);
|
||||
final NiFiUserDetails details = (NiFiUserDetails) authorizationService.loadUserByUsername(dnChain);
|
||||
final NiFiUser user = details.getNiFiUser();
|
||||
|
||||
|
||||
// verify the user
|
||||
Assert.assertEquals(USER, user.getDn());
|
||||
Assert.assertNotNull(user.getChain());
|
||||
|
||||
|
||||
// get the proxy
|
||||
NiFiUser proxy = user.getChain();
|
||||
|
||||
|
||||
// verify the proxy
|
||||
Assert.assertEquals(PROXY, proxy.getDn());
|
||||
Assert.assertNotNull(proxy.getChain());
|
||||
|
||||
|
||||
// get the proxies proxy
|
||||
proxy = proxy.getChain();
|
||||
|
||||
|
||||
// verify the proxies proxy
|
||||
Assert.assertEquals(PROXY_PROXY, proxy.getDn());
|
||||
Assert.assertNull(proxy.getChain());
|
||||
|
|
Loading…
Reference in New Issue