NIFI-6886 - Bugfix

Attribute peerPersistence can be null generating Bulletin WARNs "Unable to refresh Remote Group's peers due to null".

Rollback

The fix is inside site-to-site-reporting-task-bundle

Modify getClient()

Get ConfigurationContext and ReportingContext to provide a StateManager.

Modify OnScheduled setup()

The OnSchedule setup() now save the ConfigurationContext to lazily create a SiteToSiteClient with ReportingContext through an overloaded setup().

Modify OnTrigger

Lazily creates SiteToSiteClient to provide a StateManager

Modify OnTrigger

Lazily create SiteToSiteClient to provide a StateManager

Modify OnTrigger

Lazily create SiteToSiteClient to provide a StateManager

Modify OnTrigger

Lazily create SiteToSiteClient to provide a StateManager

Retry compile

Fix maven-checkstyle-plugin

Fix maven-checkstyle-plugin

Fix maven-checkstyle-plugin

Fix maven-checkstyle-plugin

Update AbstractSiteToSiteReportingTask.java

Removed the OnSchedule setup(ConfigContext) because it is not needed.

Update SiteToSiteUtils.java

Removed ConfigContext from getClient parameters because ReportContext share the same properties.
This commit is contained in:
Eduardo Fontes 2019-11-20 16:33:27 -03:00 committed by Pierre Villard
parent 65ca8e6c8d
commit 1ee6dba00a
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
6 changed files with 33 additions and 21 deletions

View File

@ -34,10 +34,8 @@ import javax.json.JsonArray;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Transaction;
@ -118,9 +116,10 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
return properties;
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
siteToSiteClient = SiteToSiteUtils.getClient(context, getLogger());
public void setup(final ReportingContext reportContext) throws IOException {
if (siteToSiteClient != null) {
siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger());
}
}
@OnStopped

View File

@ -138,11 +138,14 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
// Send the JSON document for the current batch
try {
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
}
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
}
final Map<String, String> attributes = new HashMap<>();
final String transactionId = UUID.randomUUID().toString();

View File

@ -192,6 +192,9 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
}
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {

View File

@ -305,6 +305,9 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
// Send the JSON document for the current batch
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
// Throw an exception to avoid provenance event id will not proceed so that those can be consumed again.

View File

@ -158,6 +158,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
while(!jsonBatch.isEmpty()) {
// Send the JSON document for the current batch
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {

View File

@ -20,7 +20,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
@ -29,6 +28,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
@ -143,8 +143,8 @@ public class SiteToSiteUtils {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static SiteToSiteClient getClient(ConfigurationContext context, ComponentLog logger) {
final SSLContextService sslContextService = context.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
public static SiteToSiteClient getClient(ReportingContext reportContext, ComponentLog logger) {
final SSLContextService sslContextService = reportContext.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
final EventReporter eventReporter = (EventReporter) (severity, category, message) -> {
switch (severity) {
@ -158,22 +158,23 @@ public class SiteToSiteUtils {
break;
}
};
final String destinationUrl = context.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
final String destinationUrl = reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
context.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
return new SiteToSiteClient.Builder()
.urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
.portName(context.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
.useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
.portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
.useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
.eventReporter(eventReporter)
.sslContext(sslContext)
.timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.timeout(reportContext.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(mode)
.httpProxy(httpProxy)
.stateManager(reportContext.getStateManager())
.build();
}