mirror of https://github.com/apache/nifi.git
NIFI-5833 This closes #3180. Marked GetTwitter Consumer Key and Access Token processor properties as sensitive.
NIFI-5833 Added unit test to demonstrate arbitrary decryption of sensitive values regardless of processor property sensitive status. NIFI-5833 Updated GetTwitter documentation with note about 1.9.0+ marking Consumer Key and Access Token as sensitive. Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
3c7012ffda
commit
f6b171d5f7
|
@ -17,12 +17,14 @@
|
|||
package org.apache.nifi.controller.serialization
|
||||
|
||||
import org.apache.commons.codec.binary.Hex
|
||||
import org.apache.nifi.controller.StandardFlowSynchronizer
|
||||
import org.apache.nifi.encrypt.EncryptionException
|
||||
import org.apache.nifi.encrypt.StringEncryptor
|
||||
import org.apache.nifi.properties.StandardNiFiProperties
|
||||
import org.apache.nifi.security.kms.CryptoUtils
|
||||
import org.apache.nifi.security.util.EncryptionMethod
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.apache.nifi.web.api.dto.ProcessGroupDTO
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -32,12 +34,15 @@ import org.junit.runner.RunWith
|
|||
import org.junit.runners.JUnit4
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.w3c.dom.Document
|
||||
import org.w3c.dom.Element
|
||||
|
||||
import javax.crypto.Cipher
|
||||
import javax.crypto.SecretKey
|
||||
import javax.crypto.SecretKeyFactory
|
||||
import javax.crypto.spec.PBEKeySpec
|
||||
import javax.crypto.spec.PBEParameterSpec
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.security.Security
|
||||
|
||||
import static groovy.test.GroovyAssert.shouldFail
|
||||
|
@ -135,6 +140,83 @@ class FlowFromDOMFactoryTest {
|
|||
assert msg.message =~ "Check that the ${KEY} value in nifi.properties matches the value used to encrypt the flow.xml.gz file"
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldDecryptSensitiveFlowValueRegardlessOfPropertySensitiveStatus() throws Exception {
|
||||
// Arrange
|
||||
|
||||
// Create a mock Element object to be parsed
|
||||
|
||||
// TODO: Mock call to StandardFlowSynchronizer#readFlowFromDisk()
|
||||
final String FLOW_XML = """<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<flowController encoding-version="1.3">
|
||||
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
|
||||
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
|
||||
<registries/>
|
||||
<rootGroup>
|
||||
<id>32aeba59-0167-1000-fc76-847bf5d10d73</id>
|
||||
<name>NiFi Flow</name>
|
||||
<position x="0.0" y="0.0"/>
|
||||
<comment/>
|
||||
<processor>
|
||||
<id>32af5e4e-0167-1000-ad5f-c79ff57c851e</id>
|
||||
<name>Example Processor</name>
|
||||
<position x="461.0" y="80.0"/>
|
||||
<styles/>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.processors.test.ExampleProcessor</class>
|
||||
<bundle>
|
||||
<group>org.apache.nifi</group>
|
||||
<artifact>nifi-test-nar</artifact>
|
||||
<version>1.9.0-SNAPSHOT</version>
|
||||
</bundle>
|
||||
<maxConcurrentTasks>1</maxConcurrentTasks>
|
||||
<schedulingPeriod>0 sec</schedulingPeriod>
|
||||
<penalizationPeriod>30 sec</penalizationPeriod>
|
||||
<yieldPeriod>1 sec</yieldPeriod>
|
||||
<bulletinLevel>WARN</bulletinLevel>
|
||||
<lossTolerant>false</lossTolerant>
|
||||
<scheduledState>STOPPED</scheduledState>
|
||||
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
|
||||
<executionNode>ALL</executionNode>
|
||||
<runDurationNanos>0</runDurationNanos>
|
||||
<property>
|
||||
<name>Plaintext Property</name>
|
||||
<value>plain value</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>Sensitive Property</name>
|
||||
<value>enc{29077eedc9af7515cc3e0d2d29a93a5cbb059164876948458fd0c890009c8661}</value>
|
||||
</property>
|
||||
</processor>
|
||||
</rootGroup>
|
||||
<controllerServices/>
|
||||
<reportingTasks/>
|
||||
</flowController>
|
||||
"""
|
||||
|
||||
// TODO: Mock call to StandardFlowSynchronizer#parseFlowBytes()
|
||||
Document flow = StandardFlowSynchronizer.parseFlowBytes(FLOW_XML.getBytes(StandardCharsets.UTF_8))
|
||||
|
||||
// Logic to extract root process group
|
||||
final Element rootElement = flow.getDocumentElement()
|
||||
|
||||
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0)
|
||||
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement)
|
||||
|
||||
StringEncryptor flowEncryptor = new StringEncryptor(EncryptionMethod.MD5_128AES.algorithm, EncryptionMethod.MD5_128AES.provider, DEFAULT_PASSWORD)
|
||||
|
||||
// Act
|
||||
ProcessGroupDTO decryptedProcessGroupDTO = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, flowEncryptor, encodingVersion)
|
||||
logger.info("PG DTO: ${decryptedProcessGroupDTO}")
|
||||
|
||||
// Assert
|
||||
def processorProperties = decryptedProcessGroupDTO.contents.processors.first().config.properties
|
||||
logger.info("Parsed processor properties: ${processorProperties}")
|
||||
|
||||
assert processorProperties.find { it.key == "Plaintext Property" }.value == "plain value"
|
||||
assert processorProperties.find { it.key == "Sensitive Property" }.value == "sensitive value"
|
||||
}
|
||||
|
||||
private
|
||||
static Cipher generateCipher(boolean encryptMode, String password = DEFAULT_PASSWORD, byte[] salt = DEFAULT_SALT, int iterationCount = DEFAULT_ITERATION_COUNT) {
|
||||
// Initialize secret key from password
|
||||
|
|
|
@ -16,9 +16,21 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.twitter;
|
||||
|
||||
import com.twitter.hbc.ClientBuilder;
|
||||
import com.twitter.hbc.core.Client;
|
||||
import com.twitter.hbc.core.Constants;
|
||||
import com.twitter.hbc.core.endpoint.Location;
|
||||
import com.twitter.hbc.core.endpoint.Location.Coordinate;
|
||||
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
|
||||
import com.twitter.hbc.core.event.Event;
|
||||
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
|
||||
import com.twitter.hbc.httpclient.auth.Authentication;
|
||||
import com.twitter.hbc.httpclient.auth.OAuth1;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -31,7 +43,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -56,24 +67,10 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.twitter.hbc.ClientBuilder;
|
||||
import com.twitter.hbc.core.Client;
|
||||
import com.twitter.hbc.core.Constants;
|
||||
import com.twitter.hbc.core.endpoint.Location ;
|
||||
import com.twitter.hbc.core.endpoint.Location.Coordinate ;
|
||||
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
|
||||
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
|
||||
import com.twitter.hbc.core.event.Event;
|
||||
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
|
||||
import com.twitter.hbc.httpclient.auth.Authentication;
|
||||
import com.twitter.hbc.httpclient.auth.OAuth1;
|
||||
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"twitter", "tweets", "social media", "status", "json"})
|
||||
@CapabilityDescription("Pulls status changes from Twitter's streaming API")
|
||||
@CapabilityDescription("Pulls status changes from Twitter's streaming API. In versions starting with 1.9.0, the Consumer Key and Access Token are marked as sensitive according to https://developer.twitter.com/en/docs/basics/authentication/guides/securing-keys-and-tokens")
|
||||
@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json")
|
||||
public class GetTwitter extends AbstractProcessor {
|
||||
|
||||
|
@ -92,6 +89,7 @@ public class GetTwitter extends AbstractProcessor {
|
|||
.name("Consumer Key")
|
||||
.description("The Consumer Key provided by Twitter")
|
||||
.required(true)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder()
|
||||
|
@ -105,6 +103,7 @@ public class GetTwitter extends AbstractProcessor {
|
|||
.name("Access Token")
|
||||
.description("The Access Token provided by Twitter")
|
||||
.required(true)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder()
|
||||
|
@ -221,7 +220,7 @@ public class GetTwitter extends AbstractProcessor {
|
|||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws MalformedURLException {
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
final String endpointName = context.getProperty(ENDPOINT).getValue();
|
||||
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
|
||||
context.getProperty(CONSUMER_SECRET).getValue(),
|
||||
|
@ -369,7 +368,7 @@ public class GetTwitter extends AbstractProcessor {
|
|||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
|
||||
session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI());
|
||||
}
|
||||
|
||||
private static class FollowingValidator implements Validator {
|
||||
|
|
Loading…
Reference in New Issue