mirror of https://github.com/apache/nifi.git
NIFI-7895 - Fix NPE in ConsumeMQTT with truststore only SSL CS
This closes #4587. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
930e8d9e0e
commit
88de03aebb
|
@ -288,13 +288,27 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
|
|||
|
||||
public static Properties transformSSLContextService(SSLContextService sslContextService){
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
|
||||
properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
|
||||
properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
|
||||
properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
|
||||
properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
|
||||
properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
|
||||
properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
|
||||
if (sslContextService.getSslAlgorithm() != null) {
|
||||
properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
|
||||
}
|
||||
if (sslContextService.getKeyStoreFile() != null) {
|
||||
properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
|
||||
}
|
||||
if (sslContextService.getKeyStorePassword() != null) {
|
||||
properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
|
||||
}
|
||||
if (sslContextService.getKeyStoreType() != null) {
|
||||
properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
|
||||
}
|
||||
if (sslContextService.getTrustStoreFile() != null) {
|
||||
properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
|
||||
}
|
||||
if (sslContextService.getTrustStorePassword() != null) {
|
||||
properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
|
||||
}
|
||||
if (sslContextService.getTrustStoreType() != null) {
|
||||
properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,11 @@ import io.moquette.proto.messages.PublishMessage;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
|
||||
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
|
||||
import org.apache.nifi.processors.mqtt.common.MqttTestUtils;
|
||||
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
|
@ -36,6 +40,7 @@ import java.io.FilenameFilter;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -73,6 +78,30 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
|
|||
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSLContextServiceTruststoreOnly() throws InitializationException {
|
||||
String brokerURI = "ssl://localhost:8883";
|
||||
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
|
||||
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
|
||||
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
|
||||
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
|
||||
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
|
||||
|
||||
final StandardSSLContextService sslService = new StandardSSLContextService();
|
||||
Map<String, String> sslProperties = MqttTestUtils.createSslPropertiesTruststoreOnly();
|
||||
runner.addControllerService("ssl-context", sslService, sslProperties);
|
||||
runner.enableControllerService(sslService);
|
||||
runner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
|
||||
|
||||
try {
|
||||
ConsumeMQTT processor = (ConsumeMQTT) runner.getProcessor();
|
||||
processor.onScheduled(runner.getProcessContext());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("Unexpected error");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the session.commit() fails, we should not remove the unprocessed message
|
||||
*/
|
||||
|
|
|
@ -23,8 +23,8 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
public class MqttTestUtils {
|
||||
public static Map<String, String> createSslProperties() {
|
||||
|
||||
public static Map<String, String> createSslProperties() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/keystore.jks");
|
||||
map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "passwordpassword");
|
||||
|
@ -34,4 +34,13 @@ public class MqttTestUtils {
|
|||
map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
|
||||
return map;
|
||||
}
|
||||
|
||||
public static Map<String, String> createSslPropertiesTruststoreOnly() {
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
|
||||
map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "passwordpassword");
|
||||
map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue