From 78a0e1e18b9d40a10bac4612a9a0ebdc4b7d13bb Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 5 Jan 2017 23:08:55 +0100 Subject: [PATCH] NIFI-3290 Reporting task to send bulletins with S2S This closes #1401 --- .../SiteToSiteBulletinReportingTask.java | 223 ++++++++++++++++++ .../SiteToSiteProvenanceReportingTask.java | 2 +- .../org.apache.nifi.reporting.ReportingTask | 3 +- .../TestSiteToSiteBulletinReportingTask.java | 199 ++++++++++++++++ 4 files changed, 425 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java new file mode 100644 index 0000000000..9d9b1b7d67 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.scheduling.SchedulingStrategy; + +@Tags({"bulletin", "site", "site to site", "restricted"}) +@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to " + + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins " + + "may not be sent.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.") +@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.") +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") +public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask { + + static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + static final String LAST_EVENT_ID_KEY = "last_event_id"; + + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() + .name("Platform") + .description("The value to use for the platform field in each provenance event.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private volatile long lastSentBulletinId = -1L; + + static List descriptors = new ArrayList<>(); + + static { + descriptors.add(DESTINATION_URL); + descriptors.add(PORT_NAME); + descriptors.add(SSL_CONTEXT); + descriptors.add(COMPRESS); + descriptors.add(TIMEOUT); + descriptors.add(PLATFORM); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ReportingContext context) { + + final boolean isClustered = context.isClustered(); + final String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " + + "Will wait for Node Identifier to be established."); + return; + } + + if (lastSentBulletinId < 0) { + Map state; + try { + state = context.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e); + return; + } + if (state.containsKey(LAST_EVENT_ID_KEY)) { + lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)); + } + } + + final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build(); + final List bulletins = context.getBulletinRepository().findBulletins(bulletinQuery); + + if(bulletins == null || bulletins.isEmpty()) { + getLogger().debug("No events to send because no events are stored in the repository."); + return; + } + + final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max(); + final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1; + + if(currMaxId < lastSentBulletinId){ + getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. " + + "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId}); + lastSentBulletinId = -1; + } + + if (currMaxId == lastSentBulletinId) { + getLogger().debug("No events to send due to the current max id being equal to the last id that was sent."); + return; + } + + final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + + final Map config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + final JsonObjectBuilder builder = factory.createObjectBuilder(); + + final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); + df.setTimeZone(TimeZone.getTimeZone("Z")); + + final long start = System.nanoTime(); + + // Create a JSON array of all the events in the current batch + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + for (final Bulletin bulletin : bulletins) { + if(bulletin.getId() > lastSentBulletinId) { + arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId)); + } + } + final JsonArray jsonArray = arrayBuilder.build(); + + // Send the JSON document for the current batch + try { + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().info("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final String transactionId = UUID.randomUUID().toString(); + final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, Collections.singletonMap("reporting.task.transaction.id", transactionId)); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}", + new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()}); + } catch (final IOException e) { + throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e); + } + + // Store the id of the last event so we know where we left off + try { + context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL); + } catch (final IOException ioe) { + getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.", + new Object[]{currMaxId, ioe.getMessage()}, ioe); + } + + lastSentBulletinId = currMaxId; + } + + static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, + final String platform, final String nodeIdentifier) { + + addField(builder, "objectId", UUID.randomUUID().toString()); + addField(builder, "platform", platform); + addField(builder, "bulletinId", bulletin.getId()); + addField(builder, "bulletinCategory", bulletin.getCategory()); + addField(builder, "bulletinGroupId", bulletin.getGroupId()); + addField(builder, "bulletinLevel", bulletin.getLevel()); + addField(builder, "bulletinMessage", bulletin.getMessage()); + addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress()); + addField(builder, "bulletinNodeId", nodeIdentifier); + addField(builder, "bulletinSourceId", bulletin.getSourceId()); + addField(builder, "bulletinSourceName", bulletin.getSourceName()); + addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name()); + addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp())); + + return builder.build(); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + builder.add(key, value); + } + +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 81a4ea4d95..2123e31f1c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -139,7 +139,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti try { state = context.getStateManager().getState(Scope.LOCAL).toMap(); } catch (IOException e) { - getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e); + getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e); return; } if (state.containsKey(LAST_EVENT_ID_KEY)) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index be9f654634..bdf61cce09 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask \ No newline at end of file +org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask +org.apache.nifi.reporting.SiteToSiteBulletinReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java new file mode 100644 index 0000000000..d5bce1b851 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestSiteToSiteBulletinReportingTask { + + @Test + public void testSerializedForm() throws IOException, InitializationException { + // creating the list of bulletins + final List bulletins = new ArrayList(); + bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); + + // mock the access to the list of bulletins + final ReportingContext context = Mockito.mock(ReportingContext.class); + final BulletinRepository repository = Mockito.mock(BulletinRepository.class); + Mockito.when(context.getBulletinRepository()).thenReturn(repository); + Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); + + // creating reporting task + final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); + Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task)); + + // settings properties and mocking access to properties + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); + + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + // setup the mock initialization context + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + task.initialize(initContext); + task.onTrigger(context); + + // test checking + assertEquals(1, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0); + assertEquals("message", bulletinJson.getString("bulletinMessage")); + } + + @Test + public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException { + // creating the list of bulletins + final List bulletins = new ArrayList(); + bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); + bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); + bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); + bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); + + // mock the access to the list of bulletins + final ReportingContext context = Mockito.mock(ReportingContext.class); + final BulletinRepository repository = Mockito.mock(BulletinRepository.class); + Mockito.when(context.getBulletinRepository()).thenReturn(repository); + Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); + + final long maxEventId = getMaxBulletinId(bulletins);; + + // create the mock reporting task and mock state manager + final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); + final MockStateManager stateManager = new MockStateManager(task); + + // settings properties and mocking access to properties + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); + + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + // create the state map and set the last id to the same value as maxEventId + final Map state = new HashMap<>(); + state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId)); + stateManager.setState(state, Scope.LOCAL); + + // setup the mock reporting context to return the mock state manager + Mockito.when(context.getStateManager()).thenReturn(stateManager); + + // setup the mock initialization context + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + task.initialize(initContext); + + // execute the reporting task and should not produce any data b/c max id same as previous id + task.onTrigger(context); + assertEquals(0, task.dataSent.size()); + } + + private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { + + final List dataSent = new ArrayList<>(); + + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMapOf(String.class, String.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + + } + + private Long getMaxBulletinId(List bulletins) { + long result = -1L; + for (Bulletin bulletin : bulletins) { + if(bulletin.getId() > result) { + result = bulletin.getId(); + } + } + return result; + } + +}