mirror of https://github.com/apache/nifi.git
NIFI-6223 Expose Cluster Node Type to Controller Services
NIFI-6223 Enhance Mock Controller Service to allow specifying node type This closes #3444. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
e8174fb99f
commit
2b83b7d9e8
|
@ -33,6 +33,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
|
||||||
private StateManager stateManager;
|
private StateManager stateManager;
|
||||||
private volatile ConfigurationContext configurationContext;
|
private volatile ConfigurationContext configurationContext;
|
||||||
private volatile boolean enabled = false;
|
private volatile boolean enabled = false;
|
||||||
|
private NodeTypeProvider nodeTypeProvider;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
|
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
|
||||||
|
@ -40,6 +41,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
|
||||||
serviceLookup = context.getControllerServiceLookup();
|
serviceLookup = context.getControllerServiceLookup();
|
||||||
logger = context.getLogger();
|
logger = context.getLogger();
|
||||||
stateManager = context.getStateManager();
|
stateManager = context.getStateManager();
|
||||||
|
nodeTypeProvider = context.getNodeTypeProvider();
|
||||||
init(context);
|
init(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,6 +58,14 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
|
||||||
return serviceLookup;
|
return serviceLookup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the {@link NodeTypeProvider} that was passed to the
|
||||||
|
* {@link #init(ControllerServiceInitializationContext)} method
|
||||||
|
*/
|
||||||
|
protected final NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return nodeTypeProvider;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides a mechanism by which subclasses can perform initialization of
|
* Provides a mechanism by which subclasses can perform initialization of
|
||||||
* the Controller Service before it is scheduled to be run
|
* the Controller Service before it is scheduled to be run
|
||||||
|
|
|
@ -44,4 +44,10 @@ public interface ControllerServiceInitializationContext extends KerberosContext
|
||||||
* @return the StateManager that can be used to store and retrieve state for this component
|
* @return the StateManager that can be used to store and retrieve state for this component
|
||||||
*/
|
*/
|
||||||
StateManager getStateManager();
|
StateManager getStateManager();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the {@link NodeTypeProvider} which can be used to detect the node
|
||||||
|
* type of this NiFi instance.
|
||||||
|
*/
|
||||||
|
NodeTypeProvider getNodeTypeProvider();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.documentation.init;
|
||||||
import org.apache.nifi.components.state.StateManager;
|
import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
|
import org.apache.nifi.controller.NodeTypeProvider;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -49,6 +50,11 @@ public class DocumentationControllerServiceInitializationContext implements Cont
|
||||||
return new NopStateManager();
|
return new NopStateManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKerberosServicePrincipal() {
|
public String getKerberosServicePrincipal() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -21,14 +21,17 @@ import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
|
import org.apache.nifi.controller.NodeTypeProvider;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.state.MockStateManager;
|
import org.apache.nifi.state.MockStateManager;
|
||||||
|
|
||||||
public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup {
|
public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup, NodeTypeProvider {
|
||||||
|
|
||||||
private final String identifier;
|
private final String identifier;
|
||||||
private final ComponentLog logger;
|
private final ComponentLog logger;
|
||||||
private final StateManager stateManager;
|
private final StateManager stateManager;
|
||||||
|
private volatile boolean isClustered;
|
||||||
|
private volatile boolean isPrimaryNode;
|
||||||
|
|
||||||
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) {
|
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) {
|
||||||
this(controllerService, identifier, new MockStateManager(controllerService));
|
this(controllerService, identifier, new MockStateManager(controllerService));
|
||||||
|
@ -70,6 +73,11 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
|
||||||
return stateManager;
|
return stateManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKerberosServicePrincipal() {
|
public String getKerberosServicePrincipal() {
|
||||||
return null; //this needs to be wired in.
|
return null; //this needs to be wired in.
|
||||||
|
@ -84,4 +92,25 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
|
||||||
public File getKerberosConfigurationFile() {
|
public File getKerberosConfigurationFile() {
|
||||||
return null; //this needs to be wired in.
|
return null; //this needs to be wired in.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isClustered() {
|
||||||
|
return isClustered;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isPrimary() {
|
||||||
|
return isPrimaryNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClustered(boolean clustered) {
|
||||||
|
isClustered = clustered;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPrimaryNode(boolean primaryNode) {
|
||||||
|
if (!isClustered && primaryNode) {
|
||||||
|
throw new IllegalArgumentException("Primary node is only available in cluster. Use setClustered(true) first.");
|
||||||
|
}
|
||||||
|
isPrimaryNode = primaryNode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -381,7 +381,7 @@ public class ExtensionBuilder {
|
||||||
|
|
||||||
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
|
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
|
||||||
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
|
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
|
||||||
serviceProvider, stateManager, kerberosConfig);
|
serviceProvider, stateManager, kerberosConfig, nodeTypeProvider);
|
||||||
serviceImpl.initialize(initContext);
|
serviceImpl.initialize(initContext);
|
||||||
|
|
||||||
final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
|
final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
|
import org.apache.nifi.controller.NodeTypeProvider;
|
||||||
import org.apache.nifi.controller.kerberos.KerberosConfig;
|
import org.apache.nifi.controller.kerberos.KerberosConfig;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
|
||||||
|
@ -33,16 +34,18 @@ public class StandardControllerServiceInitializationContext implements Controlle
|
||||||
private final ComponentLog logger;
|
private final ComponentLog logger;
|
||||||
private final StateManager stateManager;
|
private final StateManager stateManager;
|
||||||
private final KerberosConfig kerberosConfig;
|
private final KerberosConfig kerberosConfig;
|
||||||
|
private NodeTypeProvider nodeTypeProvider;
|
||||||
|
|
||||||
public StandardControllerServiceInitializationContext(
|
public StandardControllerServiceInitializationContext(
|
||||||
final String identifier, final ComponentLog logger,
|
final String identifier, final ComponentLog logger,
|
||||||
final ControllerServiceProvider serviceProvider, final StateManager stateManager,
|
final ControllerServiceProvider serviceProvider, final StateManager stateManager,
|
||||||
final KerberosConfig kerberosConfig) {
|
final KerberosConfig kerberosConfig, final NodeTypeProvider nodeTypeProvider) {
|
||||||
this.id = identifier;
|
this.id = identifier;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.serviceProvider = serviceProvider;
|
this.serviceProvider = serviceProvider;
|
||||||
this.stateManager = stateManager;
|
this.stateManager = stateManager;
|
||||||
this.kerberosConfig = kerberosConfig;
|
this.kerberosConfig = kerberosConfig;
|
||||||
|
this.nodeTypeProvider = nodeTypeProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,6 +98,11 @@ public class StandardControllerServiceInitializationContext implements Controlle
|
||||||
return stateManager;
|
return stateManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return nodeTypeProvider;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKerberosServicePrincipal() {
|
public String getKerberosServicePrincipal() {
|
||||||
return kerberosConfig.getPrincipal();
|
return kerberosConfig.getPrincipal();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.mock;
|
||||||
import org.apache.nifi.components.state.StateManager;
|
import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
|
import org.apache.nifi.controller.NodeTypeProvider;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -51,6 +52,11 @@ public class MockControllerServiceInitializationContext implements ControllerSer
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKerberosServicePrincipal() {
|
public String getKerberosServicePrincipal() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.state.StateManager;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
|
import org.apache.nifi.controller.NodeTypeProvider;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.lookup.LookupService;
|
import org.apache.nifi.lookup.LookupService;
|
||||||
|
@ -314,6 +315,11 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
|
||||||
return BaseScriptedLookupService.this.getStateManager();
|
return BaseScriptedLookupService.this.getStateManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeTypeProvider getNodeTypeProvider() {
|
||||||
|
return BaseScriptedLookupService.this.getNodeTypeProvider();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerServiceLookup getControllerServiceLookup() {
|
public ControllerServiceLookup getControllerServiceLookup() {
|
||||||
return BaseScriptedLookupService.super.getControllerServiceLookup();
|
return BaseScriptedLookupService.super.getControllerServiceLookup();
|
||||||
|
|
Loading…
Reference in New Issue