NIFI-2718: Show HTTP S2S Auth error on bulletin

This commit fixes following two issues, that happens when a Root Group Port
policy for S2S data transfer is removed at a remote NiFi, after a client NiFi has
connected to that port:

1. At client side, Remote Process Group should show that authorization
is failing on its bulletin, but the Exception is caught and
ignored. Nothing is shown on the UI with HTTP transport protocol.
RAW S2S shows error on RPG bulletin. This commit fixes HTTP S2S to
behave the same.

2. At server side, corresponding input-port or output-port should show
that it is accessed by an unauthorized client on its bulletin, but it's
not shown with HTTP transport protocol.
RAW S2S shows warning messages for this. This commit fixes HTTP S2S to
behave the same.

In order to fix the 2nd issue above, request authorization at
DataTransferResource is changed from using DataTransferAuthorizable
directly, to call RootGroupPort.checkUserAuthorization().

Because the blettin is tied to the Port instance and it's
difficult to produce blettin message from this resource.

Since RootGroupPort.checkUserAuthorization uses
DataTransferAuthorizable inside, the check logic stays the same as
before.

Adding a RootGroupPortAuthorizable to provide access to necessary components for performing the authorization.

This closes #996
This commit is contained in:
Koji Kawamura 2016-09-08 14:37:30 +09:00 committed by Matt Gilman
parent bc7c42efa5
commit ae251c1a6f
10 changed files with 218 additions and 35 deletions

View File

@ -166,10 +166,13 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
commSession.setUserDn(apiClient.getTrustedPeerDn()); commSession.setUserDn(apiClient.getTrustedPeerDn());
} catch (final Exception e) { } catch (final Exception e) {
apiClient.close(); apiClient.close();
logger.debug("Penalizing a peer due to {}", e.getMessage()); logger.warn("Penalizing a peer {} due to {}", peer, e.toString());
peerSelector.penalize(peer, penaltyMillis); peerSelector.penalize(peer, penaltyMillis);
if (e instanceof UnknownPortException || e instanceof PortNotRunningException) { // Following exceptions will be thrown even if we tried other peers, so throw it.
if (e instanceof UnknownPortException
|| e instanceof PortNotRunningException
|| e instanceof HandshakeException) {
throw e; throw e;
} }

View File

@ -59,6 +59,7 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
@ -137,6 +138,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_CREATED = 201;
private static final int RESPONSE_CODE_ACCEPTED = 202; private static final int RESPONSE_CODE_ACCEPTED = 202;
private static final int RESPONSE_CODE_BAD_REQUEST = 400; private static final int RESPONSE_CODE_BAD_REQUEST = 400;
private static final int RESPONSE_CODE_FORBIDDEN = 403;
private static final int RESPONSE_CODE_NOT_FOUND = 404; private static final int RESPONSE_CODE_NOT_FOUND = 404;
private static final Logger logger = LoggerFactory.getLogger(SiteToSiteRestApiClient.class); private static final Logger logger = LoggerFactory.getLogger(SiteToSiteRestApiClient.class);
@ -500,7 +502,7 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override @Override
public void failed(Exception ex) { public void failed(Exception ex) {
final String msg = String.format("Failed to create transactino for %s", post.getURI()); final String msg = String.format("Failed to create transaction for %s", post.getURI());
logger.error(msg, ex); logger.error(msg, ex);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg); eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg);
} }
@ -952,10 +954,15 @@ public class SiteToSiteRestApiClient implements Closeable {
return new UnknownPortException(errEntity.getMessage()); return new UnknownPortException(errEntity.getMessage());
case PORT_NOT_IN_VALID_STATE: case PORT_NOT_IN_VALID_STATE:
return new PortNotRunningException(errEntity.getMessage()); return new PortNotRunningException(errEntity.getMessage());
default:
switch (responseCode) {
case RESPONSE_CODE_FORBIDDEN :
return new HandshakeException(errEntity.getMessage());
default: default:
return new IOException("Unexpected response code: " + responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage()); return new IOException("Unexpected response code: " + responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
} }
} }
}
private TransactionResultEntity readResponse(final InputStream inputStream) throws IOException { private TransactionResultEntity readResponse(final InputStream inputStream) throws IOException {
final ByteArrayOutputStream bos = new ByteArrayOutputStream(); final ByteArrayOutputStream bos = new ByteArrayOutputStream();

View File

@ -23,6 +23,7 @@ import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
@ -182,6 +183,18 @@ public class TestHttpClient {
} }
public static class PortTransactionsAccessDeniedServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
respondWithText(resp, "Unable to perform the desired action" +
" due to insufficient permissions. Contact the system administrator.", 403);
}
}
public static class InputPortTransactionServlet extends HttpServlet { public static class InputPortTransactionServlet extends HttpServlet {
@Override @Override
@ -432,6 +445,7 @@ public class TestHttpClient {
servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site"); servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site");
servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers"); servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers");
servletHandler.addServletWithMapping(PortTransactionsAccessDeniedServlet.class, "/data-transfer/input-ports/input-access-denied-id/transactions");
servletHandler.addServletWithMapping(PortTransactionsServlet.class, "/data-transfer/input-ports/input-running-id/transactions"); servletHandler.addServletWithMapping(PortTransactionsServlet.class, "/data-transfer/input-ports/input-running-id/transactions");
servletHandler.addServletWithMapping(InputPortTransactionServlet.class, "/data-transfer/input-ports/input-running-id/transactions/transaction-id"); servletHandler.addServletWithMapping(InputPortTransactionServlet.class, "/data-transfer/input-ports/input-running-id/transactions/transaction-id");
servletHandler.addServletWithMapping(FlowFilesServlet.class, "/data-transfer/input-ports/input-running-id/transactions/transaction-id/flow-files"); servletHandler.addServletWithMapping(FlowFilesServlet.class, "/data-transfer/input-ports/input-running-id/transactions/transaction-id/flow-files");
@ -569,54 +583,55 @@ public class TestHttpClient {
inputPorts = new HashSet<>(); inputPorts = new HashSet<>();
final PortDTO runningInputPort = new PortDTO(); final PortDTO runningInputPort = new PortDTO();
runningInputPort.setId("running-input-port");
inputPorts.add(runningInputPort);
runningInputPort.setName("input-running"); runningInputPort.setName("input-running");
runningInputPort.setId("input-running-id"); runningInputPort.setId("input-running-id");
runningInputPort.setType("INPUT_PORT"); runningInputPort.setType("INPUT_PORT");
runningInputPort.setState(ScheduledState.RUNNING.name()); runningInputPort.setState(ScheduledState.RUNNING.name());
inputPorts.add(runningInputPort);
final PortDTO timeoutInputPort = new PortDTO(); final PortDTO timeoutInputPort = new PortDTO();
timeoutInputPort.setId("timeout-input-port");
inputPorts.add(timeoutInputPort);
timeoutInputPort.setName("input-timeout"); timeoutInputPort.setName("input-timeout");
timeoutInputPort.setId("input-timeout-id"); timeoutInputPort.setId("input-timeout-id");
timeoutInputPort.setType("INPUT_PORT"); timeoutInputPort.setType("INPUT_PORT");
timeoutInputPort.setState(ScheduledState.RUNNING.name()); timeoutInputPort.setState(ScheduledState.RUNNING.name());
inputPorts.add(timeoutInputPort);
final PortDTO timeoutDataExInputPort = new PortDTO(); final PortDTO timeoutDataExInputPort = new PortDTO();
timeoutDataExInputPort.setId("timeout-dataex-input-port");
inputPorts.add(timeoutDataExInputPort);
timeoutDataExInputPort.setName("input-timeout-data-ex"); timeoutDataExInputPort.setName("input-timeout-data-ex");
timeoutDataExInputPort.setId("input-timeout-data-ex-id"); timeoutDataExInputPort.setId("input-timeout-data-ex-id");
timeoutDataExInputPort.setType("INPUT_PORT"); timeoutDataExInputPort.setType("INPUT_PORT");
timeoutDataExInputPort.setState(ScheduledState.RUNNING.name()); timeoutDataExInputPort.setState(ScheduledState.RUNNING.name());
inputPorts.add(timeoutDataExInputPort);
final PortDTO accessDeniedInputPort = new PortDTO();
accessDeniedInputPort.setName("input-access-denied");
accessDeniedInputPort.setId("input-access-denied-id");
accessDeniedInputPort.setType("INPUT_PORT");
accessDeniedInputPort.setState(ScheduledState.RUNNING.name());
inputPorts.add(accessDeniedInputPort);
outputPorts = new HashSet<>(); outputPorts = new HashSet<>();
final PortDTO runningOutputPort = new PortDTO(); final PortDTO runningOutputPort = new PortDTO();
runningOutputPort.setId("running-output-port");
outputPorts.add(runningOutputPort);
runningOutputPort.setName("output-running"); runningOutputPort.setName("output-running");
runningOutputPort.setId("output-running-id"); runningOutputPort.setId("output-running-id");
runningOutputPort.setType("OUTPUT_PORT"); runningOutputPort.setType("OUTPUT_PORT");
runningOutputPort.setState(ScheduledState.RUNNING.name()); runningOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(runningOutputPort);
final PortDTO timeoutOutputPort = new PortDTO(); final PortDTO timeoutOutputPort = new PortDTO();
timeoutOutputPort.setId("timeout-output-port");
outputPorts.add(timeoutOutputPort);
timeoutOutputPort.setName("output-timeout"); timeoutOutputPort.setName("output-timeout");
timeoutOutputPort.setId("output-timeout-id"); timeoutOutputPort.setId("output-timeout-id");
timeoutOutputPort.setType("OUTPUT_PORT"); timeoutOutputPort.setType("OUTPUT_PORT");
timeoutOutputPort.setState(ScheduledState.RUNNING.name()); timeoutOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(timeoutOutputPort);
final PortDTO timeoutDataExOutputPort = new PortDTO(); final PortDTO timeoutDataExOutputPort = new PortDTO();
timeoutDataExOutputPort.setId("timeout-dataex-output-port");
outputPorts.add(timeoutDataExOutputPort);
timeoutDataExOutputPort.setName("output-timeout-data-ex"); timeoutDataExOutputPort.setName("output-timeout-data-ex");
timeoutDataExOutputPort.setId("output-timeout-data-ex-id"); timeoutDataExOutputPort.setId("output-timeout-data-ex-id");
timeoutDataExOutputPort.setType("OUTPUT_PORT"); timeoutDataExOutputPort.setType("OUTPUT_PORT");
timeoutDataExOutputPort.setState(ScheduledState.RUNNING.name()); timeoutDataExOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(timeoutDataExOutputPort);
} }
@ -787,6 +802,23 @@ public class TestHttpClient {
} }
@Test
public void testSendAccessDeniedHTTPS() throws Exception {
try (
final SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-access-denied")
.build()
) {
try {
client.createTransaction(TransferDirection.SEND);
fail("Handshake exception should be thrown.");
} catch (HandshakeException e) {
}
}
}
@Test @Test
public void testSendSuccessHTTPS() throws Exception { public void testSendSuccessHTTPS() throws Exception {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.remote; package org.apache.nifi.remote;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Port;
import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.NotAuthorizedException; import org.apache.nifi.remote.exception.NotAuthorizedException;
@ -41,11 +42,24 @@ public interface RootGroupPort extends Port {
* and returns a {@link PortAuthorizationResult} indicating why the user is * and returns a {@link PortAuthorizationResult} indicating why the user is
* unauthorized if this assumption fails * unauthorized if this assumption fails
* *
* {@link #checkUserAuthorization(NiFiUser)} should be used if applicable
* because NiFiUser has additional context such as chained user.
*
* @param dn dn of user * @param dn dn of user
* @return result * @return result
*/ */
PortAuthorizationResult checkUserAuthorization(String dn); PortAuthorizationResult checkUserAuthorization(String dn);
/**
* Verifies that the specified user is authorized to interact with this port
* and returns a {@link PortAuthorizationResult} indicating why the user is
* unauthorized if this assumption fails
*
* @param user to authorize
* @return result
*/
PortAuthorizationResult checkUserAuthorization(NiFiUser user);
/** /**
* Receives data from the given stream * Receives data from the given stream
* *

View File

@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable; import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
@ -345,10 +346,6 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override @Override
public PortAuthorizationResult checkUserAuthorization(final String dn) { public PortAuthorizationResult checkUserAuthorization(final String dn) {
if (!secure) {
return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
}
if (dn == null) { if (dn == null) {
final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn); final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn);
logger.warn(message); logger.warn(message);
@ -356,12 +353,28 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
return new StandardPortAuthorizationResult(false, "User DN is not known"); return new StandardPortAuthorizationResult(false, "User DN is not known");
} }
return checkUserAuthorization(new StandardNiFiUser(dn));
}
@Override
public PortAuthorizationResult checkUserAuthorization(NiFiUser user) {
if (!secure) {
return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
}
if (user == null) {
final String message = String.format("%s authorization failed because the user is unknown", this, user);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
return new StandardPortAuthorizationResult(false, "User is not known");
}
// perform the authorization // perform the authorization
final Authorizable dataTransferAuthorizable = new DataTransferAuthorizable(this); final Authorizable dataTransferAuthorizable = new DataTransferAuthorizable(this);
final AuthorizationResult result = dataTransferAuthorizable.checkAuthorization(authorizer, RequestAction.WRITE, new StandardNiFiUser(dn)); final AuthorizationResult result = dataTransferAuthorizable.checkAuthorization(authorizer, RequestAction.WRITE, user);
if (!Result.Approved.equals(result.getResult())) { if (!Result.Approved.equals(result.getResult())) {
final String message = String.format("%s authorization failed for user %s because %s", this, dn, result.getExplanation()); final String message = String.format("%s authorization failed for user %s because %s", this, user.getIdentity(), result.getExplanation());
logger.warn(message); logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
return new StandardPortAuthorizationResult(false, message); return new StandardPortAuthorizationResult(false, message);

View File

@ -60,6 +60,22 @@ public interface AuthorizableLookup {
*/ */
Authorizable getCounters(); Authorizable getCounters();
/**
* Get the authorizable RootGroup InputPort.
*
* @param id input port id
* @return authorizable
*/
RootGroupPortAuthorizable getRootGroupInputPort(String id);
/**
* Get the authorizable RootGroup OutputPort.
*
* @param id output port id
* @return authorizable
*/
RootGroupPortAuthorizable getRootGroupOutputPort(String id);
/** /**
* Get the authorizable InputPort. * Get the authorizable InputPort.
* *

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.authorization;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
/**
* Authorizable for a RootGroupPort.
*/
public interface RootGroupPortAuthorizable {
/**
* Returns the authorizable for this RootGroupPort. Non null
*
* @return authorizable
*/
Authorizable getAuthorizable();
/**
* Checks the authorization for the specified user.
*
* @param user user
* @return authorization result
*/
AuthorizationResult checkAuthorization(NiFiUser user);
}

View File

@ -24,9 +24,11 @@ import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.resource.TenantAuthorizable; import org.apache.nifi.authorization.resource.TenantAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
@ -35,6 +37,8 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO; import org.apache.nifi.web.dao.AccessPolicyDAO;
@ -163,6 +167,62 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
} }
} }
@Override
public RootGroupPortAuthorizable getRootGroupInputPort(String id) {
final Port inputPort = inputPortDAO.getPort(id);
if (!(inputPort instanceof RootGroupPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an input port in the root group.", id));
}
final DataTransferAuthorizable baseAuthorizable = new DataTransferAuthorizable(inputPort);
return new RootGroupPortAuthorizable() {
@Override
public Authorizable getAuthorizable() {
return baseAuthorizable;
}
@Override
public AuthorizationResult checkAuthorization(NiFiUser user) {
// perform the authorization of the user by using the underlying component, ensures consistent authorization with raw s2s
final PortAuthorizationResult authorizationResult = ((RootGroupPort) inputPort).checkUserAuthorization(user);
if (authorizationResult.isAuthorized()) {
return AuthorizationResult.approved();
} else {
return AuthorizationResult.denied(authorizationResult.getExplanation());
}
}
};
}
@Override
public RootGroupPortAuthorizable getRootGroupOutputPort(String id) {
final Port outputPort = outputPortDAO.getPort(id);
if (!(outputPort instanceof RootGroupPort)) {
throw new IllegalArgumentException(String.format("The specified id '%s' does not represent an output port in the root group.", id));
}
final DataTransferAuthorizable baseAuthorizable = new DataTransferAuthorizable(outputPort);
return new RootGroupPortAuthorizable() {
@Override
public Authorizable getAuthorizable() {
return baseAuthorizable;
}
@Override
public AuthorizationResult checkAuthorization(NiFiUser user) {
// perform the authorization of the user by using the underlying component, ensures consistent authorization with raw s2s
final PortAuthorizationResult authorizationResult = ((RootGroupPort) outputPort).checkUserAuthorization(user);
if (authorizationResult.isAuthorized()) {
return AuthorizationResult.approved();
} else {
return AuthorizationResult.denied(authorizationResult.getExplanation());
}
}
};
}
@Override @Override
public Authorizable getInputPort(final String id) { public Authorizable getInputPort(final String id) {
return inputPortDAO.getPort(id); return inputPortDAO.getPort(id);

View File

@ -23,10 +23,11 @@ import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization; import com.wordnik.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable; import org.apache.nifi.authorization.RootGroupPortAuthorizable;
import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
@ -107,7 +108,6 @@ public class DataTransferResource extends ApplicationResource {
private static final String PORT_TYPE_INPUT = "input-ports"; private static final String PORT_TYPE_INPUT = "input-ports";
private static final String PORT_TYPE_OUTPUT = "output-ports"; private static final String PORT_TYPE_OUTPUT = "output-ports";
private Authorizer authorizer;
private NiFiServiceFacade serviceFacade; private NiFiServiceFacade serviceFacade;
private final ResponseCreator responseCreator = new ResponseCreator(); private final ResponseCreator responseCreator = new ResponseCreator();
private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
@ -133,15 +133,18 @@ public class DataTransferResource extends ApplicationResource {
} }
// get the authorizable // get the authorizable
final DataTransferAuthorizable authorizable; final RootGroupPortAuthorizable authorizable;
if (ResourceType.InputPort.equals(resourceType)) { if (ResourceType.InputPort.equals(resourceType)) {
authorizable = new DataTransferAuthorizable(lookup.getInputPort(identifier)); authorizable = lookup.getRootGroupInputPort(identifier);
} else { } else {
authorizable = new DataTransferAuthorizable(lookup.getOutputPort(identifier)); authorizable = lookup.getRootGroupOutputPort(identifier);
} }
// perform the authorization // perform the authorization
authorizable.authorize(authorizer, RequestAction.WRITE, user); final AuthorizationResult authorizationResult = authorizable.checkAuthorization(user);
if (!Result.Approved.equals(authorizationResult.getResult())) {
throw new AccessDeniedException(authorizationResult.getExplanation());
}
} }
@POST @POST
@ -831,10 +834,6 @@ public class DataTransferResource extends ApplicationResource {
// setters // setters
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setServiceFacade(NiFiServiceFacade serviceFacade) { public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade; this.serviceFacade = serviceFacade;
} }

View File

@ -231,7 +231,6 @@
<property name="properties" ref="nifiProperties"/> <property name="properties" ref="nifiProperties"/>
<property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" /> <property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="serviceFacade" ref="serviceFacade"/> <property name="serviceFacade" ref="serviceFacade"/>
<property name="flowController" ref="flowController" /> <property name="flowController" ref="flowController" />
</bean> </bean>