NIFI-4343 - allow multiple URLs in SiteToSite reporting tasks

using SiteToSiteClient.Builder().urls() instead of url()

Updated validator to use parseClusterUrls method

This closes #2121.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-09-01 12:14:51 +02:00 committed by Koji Kawamura
parent a73b5bda42
commit 448f03edae
2 changed files with 38 additions and 23 deletions

View File

@ -29,13 +29,13 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -50,7 +50,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.name("Destination URL") .name("Destination URL")
.displayName("Destination URL") .displayName("Destination URL")
.description("The URL of the destination NiFi instance to send data to, " + .description("The URL of the destination NiFi instance to send data to, " +
"should be in the format http(s)://host:port/nifi.") "should be a comma-separated list of address in the format of http(s)://host:port/nifi.")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.addValidator(new NiFiUrlValidator()) .addValidator(new NiFiUrlValidator())
@ -190,7 +190,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
siteToSiteClient = new SiteToSiteClient.Builder() siteToSiteClient = new SiteToSiteClient.Builder()
.url(destinationUrl) .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
.portName(context.getProperty(PORT_NAME).getValue()) .portName(context.getProperty(PORT_NAME).getValue())
.useCompression(context.getProperty(COMPRESS).asBoolean()) .useCompression(context.getProperty(COMPRESS).asBoolean())
.eventReporter(eventReporter) .eventReporter(eventReporter)
@ -218,33 +218,21 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
@Override @Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) { public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
URL url;
try { try {
url = new URL(value); SiteToSiteRestApiClient.parseClusterUrls(value);
} catch (final Exception e) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Not a valid URL")
.build();
}
if (url != null && !url.getPath().equals(DESTINATION_URL_PATH)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("URL path must be " + DESTINATION_URL_PATH)
.build();
}
return new ValidationResult.Builder() return new ValidationResult.Builder()
.input(input) .input(input)
.subject(subject) .subject(subject)
.valid(true) .valid(true)
.build(); .build();
} catch (IllegalArgumentException ex) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(ex.getLocalizedMessage())
.build();
}
} }
} }
} }

View File

@ -18,6 +18,8 @@
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
@ -32,14 +34,17 @@ import javax.json.Json;
import javax.json.JsonObject; import javax.json.JsonObject;
import javax.json.JsonReader; import javax.json.JsonReader;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask.NiFiUrlValidator;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert; import org.junit.Assert;
@ -50,6 +55,28 @@ import org.mockito.stubbing.Answer;
public class TestSiteToSiteBulletinReportingTask { public class TestSiteToSiteBulletinReportingTask {
@Test
public void testUrls() throws IOException {
final ValidationContext context = Mockito.mock(ValidationContext.class);
Mockito.when(context.newPropertyValue(Mockito.anyString())).then(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(InvocationOnMock invocation) throws Throwable {
String value = (String) invocation.getArguments()[0];
return new StandardPropertyValue(value, null);
}
});
assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080", context).isValid());
assertFalse(new NiFiUrlValidator().validate("url", "", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi,https://localhost:8080/nifi", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi, https://localhost:8080/nifi", context).isValid());
assertFalse(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi, https://localhost:8080/nifi", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi,http://localhost:8080/nifi", context).isValid());
assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi,http://localhost:8080", context).isValid());
}
@Test @Test
public void testSerializedForm() throws IOException, InitializationException { public void testSerializedForm() throws IOException, InitializationException {
// creating the list of bulletins // creating the list of bulletins