diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java index b5ec133f1c..52cd7ec0a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java @@ -14,11 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.controller.cluster; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -27,7 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZooKeeperClientConfig { + private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class); + private static final Pattern PORT_PATTERN = Pattern.compile("[0-9]{1,5}"); private final String connectString; private final int sessionTimeoutMillis; @@ -70,7 +75,10 @@ public class ZooKeeperClientConfig { if (connectString == null || connectString.trim().isEmpty()) { throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties"); } - + final String cleanedConnectString = cleanConnectString(connectString); + if (cleanedConnectString.isEmpty()) { + throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is set in nifi.properties but needs to be in pairs of host:port separated by commas"); + } final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); @@ -81,7 +89,7 @@ public class ZooKeeperClientConfig { throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath); } - return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath); + return new ZooKeeperClientConfig(cleanedConnectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath); } private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) { @@ -93,4 +101,46 @@ public class ZooKeeperClientConfig { return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS); } } + + /** + * Takes a given connect string and splits it by ',' character. For each + * split result trims whitespace then splits by ':' character. For each + * secondary split if a single value is returned it is trimmed and then the + * default zookeeper 2181 is append by adding ":2181". If two values are + * returned then the second value is evaluated to ensure it contains only + * digits and if not then the entry is in error and exception is raised. + * If more than two values are + * returned the entry is in error and an exception is raised. + * Each entry is trimmed and if empty the + * entry is skipped. After all splits are cleaned then they are all appended + * back together demarcated by "," and the full string is returned. + * + * @param connectString the string to clean + * @return cleaned connect string guaranteed to be non null but could be + * empty + * @throws IllegalStateException if any portions could not be cleaned/parsed + */ + public static String cleanConnectString(final String connectString) { + final String nospaces = StringUtils.deleteWhitespace(connectString); + final String hostPortPairs[] = StringUtils.split(nospaces, ",", 100); + final List cleanedEntries = new ArrayList<>(hostPortPairs.length); + for (final String pair : hostPortPairs) { + final String pairSplits[] = StringUtils.split(pair, ":", 3); + if (pairSplits.length > 2 || pairSplits[0].isEmpty()) { + throw new IllegalStateException("Invalid host:port pair entry '" + + pair + "' in nifi.properties " + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property"); + } + if (pairSplits.length == 1) { + cleanedEntries.add(pairSplits[0] + ":2181"); + }else{ + if(PORT_PATTERN.matcher(pairSplits[1]).matches()){ + cleanedEntries.add(pairSplits[0] + ":" + pairSplits[1]); + }else{ + throw new IllegalStateException("The port specified in this pair must be 1 to 5 digits only but was '" + + pair + "' in nifi.properties " + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property"); + } + } + } + return StringUtils.join(cleanedEntries, ","); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/ZooKeeperClientConfigTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/ZooKeeperClientConfigTest.java new file mode 100644 index 0000000000..0c278360d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/ZooKeeperClientConfigTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +public class ZooKeeperClientConfigTest { + + @Test + public void testEasyCase(){ + final String input = "local:1234"; + final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input); + assertEquals(input, cleanedInput); + } + + @Test + public void testValidFunkyInput(){ + final String input = "local: 1234 "; + final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input); + assertEquals("local:1234", cleanedInput); + } + + @Test(expected = IllegalStateException.class) + public void testInvalidSingleEntry(){ + ZooKeeperClientConfig.cleanConnectString("local: 1a34 "); + } + + @Test + public void testSingleEntryNoPort(){ + final String input = "local"; + final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input); + assertEquals("local:2181", cleanedInput); + } + + @Test + public void testMultiValidEntry(){ + final String input = "local:1234,local:1235,local:1235,local:14952"; + final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input); + assertEquals(input, cleanedInput); + } + + @Test(expected = IllegalStateException.class) + public void testMultiValidEntrySkipOne(){ + ZooKeeperClientConfig.cleanConnectString("local:1234,local:1235,local:12a5,local:14952"); + } + + @Test + public void testMultiValidEntrySpacesForDays(){ + final String input = " local : 1234 , local: 1235,local :1295,local:14952 "; + final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input); + assertEquals("local:1234,local:1235,local:1295,local:14952", cleanedInput); + } + + @Test(expected = IllegalStateException.class) + public void testMultiValidOneNonsense(){ + ZooKeeperClientConfig.cleanConnectString(" local : 1234 , local: 1235:wack,local :1295,local:14952 "); + } +}