NIFI-2253 flexibly cleaning zookeeper connect string

This closes #704

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
joewitt 2016-07-21 22:43:15 -04:00 committed by jpercivall
parent 7e2740160a
commit 393a3925dd
2 changed files with 127 additions and 3 deletions

View File

@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.controller.cluster; package org.apache.nifi.controller.cluster;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
@ -27,7 +30,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ZooKeeperClientConfig { public class ZooKeeperClientConfig {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class); private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class);
private static final Pattern PORT_PATTERN = Pattern.compile("[0-9]{1,5}");
private final String connectString; private final String connectString;
private final int sessionTimeoutMillis; private final int sessionTimeoutMillis;
@ -70,7 +75,10 @@ public class ZooKeeperClientConfig {
if (connectString == null || connectString.trim().isEmpty()) { if (connectString == null || connectString.trim().isEmpty()) {
throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties"); throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
} }
final String cleanedConnectString = cleanConnectString(connectString);
if (cleanedConnectString.isEmpty()) {
throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is set in nifi.properties but needs to be in pairs of host:port separated by commas");
}
final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
@ -81,7 +89,7 @@ public class ZooKeeperClientConfig {
throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath); throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
} }
return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath); return new ZooKeeperClientConfig(cleanedConnectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath);
} }
private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) { private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) {
@ -93,4 +101,46 @@ public class ZooKeeperClientConfig {
return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS); return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
} }
} }
/**
* Takes a given connect string and splits it by ',' character. For each
* split result trims whitespace then splits by ':' character. For each
* secondary split if a single value is returned it is trimmed and then the
* default zookeeper 2181 is append by adding ":2181". If two values are
* returned then the second value is evaluated to ensure it contains only
* digits and if not then the entry is in error and exception is raised.
* If more than two values are
* returned the entry is in error and an exception is raised.
* Each entry is trimmed and if empty the
* entry is skipped. After all splits are cleaned then they are all appended
* back together demarcated by "," and the full string is returned.
*
* @param connectString the string to clean
* @return cleaned connect string guaranteed to be non null but could be
* empty
* @throws IllegalStateException if any portions could not be cleaned/parsed
*/
public static String cleanConnectString(final String connectString) {
final String nospaces = StringUtils.deleteWhitespace(connectString);
final String hostPortPairs[] = StringUtils.split(nospaces, ",", 100);
final List<String> cleanedEntries = new ArrayList<>(hostPortPairs.length);
for (final String pair : hostPortPairs) {
final String pairSplits[] = StringUtils.split(pair, ":", 3);
if (pairSplits.length > 2 || pairSplits[0].isEmpty()) {
throw new IllegalStateException("Invalid host:port pair entry '" +
pair + "' in nifi.properties " + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property");
}
if (pairSplits.length == 1) {
cleanedEntries.add(pairSplits[0] + ":2181");
}else{
if(PORT_PATTERN.matcher(pairSplits[1]).matches()){
cleanedEntries.add(pairSplits[0] + ":" + pairSplits[1]);
}else{
throw new IllegalStateException("The port specified in this pair must be 1 to 5 digits only but was '" +
pair + "' in nifi.properties " + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property");
}
}
}
return StringUtils.join(cleanedEntries, ",");
}
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class ZooKeeperClientConfigTest {
@Test
public void testEasyCase(){
final String input = "local:1234";
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
assertEquals(input, cleanedInput);
}
@Test
public void testValidFunkyInput(){
final String input = "local: 1234 ";
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
assertEquals("local:1234", cleanedInput);
}
@Test(expected = IllegalStateException.class)
public void testInvalidSingleEntry(){
ZooKeeperClientConfig.cleanConnectString("local: 1a34 ");
}
@Test
public void testSingleEntryNoPort(){
final String input = "local";
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
assertEquals("local:2181", cleanedInput);
}
@Test
public void testMultiValidEntry(){
final String input = "local:1234,local:1235,local:1235,local:14952";
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
assertEquals(input, cleanedInput);
}
@Test(expected = IllegalStateException.class)
public void testMultiValidEntrySkipOne(){
ZooKeeperClientConfig.cleanConnectString("local:1234,local:1235,local:12a5,local:14952");
}
@Test
public void testMultiValidEntrySpacesForDays(){
final String input = " local : 1234 , local: 1235,local :1295,local:14952 ";
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
assertEquals("local:1234,local:1235,local:1295,local:14952", cleanedInput);
}
@Test(expected = IllegalStateException.class)
public void testMultiValidOneNonsense(){
ZooKeeperClientConfig.cleanConnectString(" local : 1234 , local: 1235:wack,local :1295,local:14952 ");
}
}