NIFI-5092 - Removed local state management for S2S Bulletins RT

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2643
This commit is contained in:
Pierre Villard 2018-04-18 12:21:04 +02:00 committed by Matthew Burgess
parent af2513adf8
commit 23937835f3
2 changed files with 0 additions and 101 deletions

View File

@ -19,13 +19,11 @@ package org.apache.nifi.reporting;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@ -57,7 +55,6 @@ import java.util.concurrent.TimeUnit;
@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to " @CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+ "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins " + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
+ "may not be sent.") + "may not be sent.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
@Restricted( @Restricted(
restrictions = { restrictions = {
@Restriction( @Restriction(
@ -98,19 +95,6 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
return; return;
} }
if (lastSentBulletinId < 0) {
Map<String, String> state;
try {
state = context.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
return;
}
if (state.containsKey(LAST_EVENT_ID_KEY)) {
lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
}
}
final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build(); final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery); final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
@ -181,14 +165,6 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e); throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
} }
// Store the id of the last event so we know where we left off
try {
context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL);
} catch (final IOException ioe) {
getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
new Object[]{currMaxId, ioe.getMessage()}, ioe);
}
lastSentBulletinId = currMaxId; lastSentBulletinId = currMaxId;
} }

View File

@ -38,14 +38,11 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask.NiFiUrlValidator; import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask.NiFiUrlValidator;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -91,7 +88,6 @@ public class TestSiteToSiteBulletinReportingTask {
// creating reporting task // creating reporting task
final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
// settings properties and mocking access to properties // settings properties and mocking access to properties
final Map<PropertyDescriptor, String> properties = new HashMap<>(); final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -127,69 +123,6 @@ public class TestSiteToSiteBulletinReportingTask {
assertEquals("group-name", bulletinJson.getString("bulletinGroupName")); assertEquals("group-name", bulletinJson.getString("bulletinGroupName"));
} }
@Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
// mock the access to the list of bulletins
final ReportingContext context = Mockito.mock(ReportingContext.class);
final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
Mockito.when(context.getBulletinRepository()).thenReturn(repository);
Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
final long maxEventId = getMaxBulletinId(bulletins);;
// create the mock reporting task and mock state manager
final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
final MockStateManager stateManager = new MockStateManager(task);
// settings properties and mocking access to properties
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name());
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, "localhost");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, "username");
properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, "password");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// create the state map and set the last id to the same value as maxEventId
final Map<String,String> state = new HashMap<>();
state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
stateManager.setState(state, Scope.LOCAL);
// setup the mock reporting context to return the mock state manager
Mockito.when(context.getStateManager()).thenReturn(stateManager);
// setup the mock initialization context
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
// execute the reporting task and should not produce any data b/c max id same as previous id
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {
final List<byte[]> dataSent = new ArrayList<>(); final List<byte[]> dataSent = new ArrayList<>();
@ -220,14 +153,4 @@ public class TestSiteToSiteBulletinReportingTask {
} }
private Long getMaxBulletinId(List<Bulletin> bulletins) {
long result = -1L;
for (Bulletin bulletin : bulletins) {
if(bulletin.getId() > result) {
result = bulletin.getId();
}
}
return result;
}
} }