NIFI-3290 Reporting task to send bulletins with S2S

This closes #1401
This commit is contained in:
Pierre Villard 2017-01-05 23:08:55 +01:00 committed by Oleg Zhurakousky
parent 89f1bd3189
commit 78a0e1e18b
4 changed files with 425 additions and 2 deletions

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.scheduling.SchedulingStrategy;
@Tags({"bulletin", "site", "site to site", "restricted"})
@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+ "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
+ "may not be sent.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.")
@Restricted("Provides operator the ability to send sensitive details contained in bulletin events to any external system.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final String LAST_EVENT_ID_KEY = "last_event_id";
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile long lastSentBulletinId = -1L;
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(DESTINATION_URL);
descriptors.add(PORT_NAME);
descriptors.add(SSL_CONTEXT);
descriptors.add(COMPRESS);
descriptors.add(TIMEOUT);
descriptors.add(PLATFORM);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ReportingContext context) {
final boolean isClustered = context.isClustered();
final String nodeId = context.getClusterNodeIdentifier();
if (nodeId == null && isClustered) {
getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ "Will wait for Node Identifier to be established.");
return;
}
if (lastSentBulletinId < 0) {
Map<String, String> state;
try {
state = context.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
return;
}
if (state.containsKey(LAST_EVENT_ID_KEY)) {
lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY));
}
}
final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build();
final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
if(bulletins == null || bulletins.isEmpty()) {
getLogger().debug("No events to send because no events are stored in the repository.");
return;
}
final OptionalLong opMaxId = bulletins.stream().mapToLong(t -> t.getId()).max();
final Long currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1;
if(currMaxId < lastSentBulletinId){
getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. "
+ "This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, lastSentBulletinId});
lastSentBulletinId = -1;
}
if (currMaxId == lastSentBulletinId) {
getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
return;
}
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
final long start = System.nanoTime();
// Create a JSON array of all the events in the current batch
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final Bulletin bulletin : bulletins) {
if(bulletin.getId() > lastSentBulletinId) {
arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
}
}
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
try {
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
}
final String transactionId = UUID.randomUUID().toString();
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
transaction.send(data, Collections.singletonMap("reporting.task.transaction.id", transactionId));
transaction.confirm();
transaction.complete();
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()});
} catch (final IOException e) {
throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e);
}
// Store the id of the last event so we know where we left off
try {
context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL);
} catch (final IOException ioe) {
getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.",
new Object[]{currMaxId, ioe.getMessage()}, ioe);
}
lastSentBulletinId = currMaxId;
}
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
final String platform, final String nodeIdentifier) {
addField(builder, "objectId", UUID.randomUUID().toString());
addField(builder, "platform", platform);
addField(builder, "bulletinId", bulletin.getId());
addField(builder, "bulletinCategory", bulletin.getCategory());
addField(builder, "bulletinGroupId", bulletin.getGroupId());
addField(builder, "bulletinLevel", bulletin.getLevel());
addField(builder, "bulletinMessage", bulletin.getMessage());
addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress());
addField(builder, "bulletinNodeId", nodeIdentifier);
addField(builder, "bulletinSourceId", bulletin.getSourceId());
addField(builder, "bulletinSourceName", bulletin.getSourceName());
addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name());
addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()));
return builder.build();
}
private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
if (value != null) {
builder.add(key, value.longValue());
}
}
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
if (value == null) {
return;
}
builder.add(key, value);
}
}

View File

@ -139,7 +139,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
try { try {
state = context.getStateManager().getState(Scope.LOCAL).toMap(); state = context.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) { } catch (IOException e) {
getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e); getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
return; return;
} }
if (state.containsKey(LAST_EVENT_ID_KEY)) { if (state.containsKey(LAST_EVENT_ID_KEY)) {

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
org.apache.nifi.reporting.SiteToSiteBulletinReportingTask

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestSiteToSiteBulletinReportingTask {
@Test
public void testSerializedForm() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
// mock the access to the list of bulletins
final ReportingContext context = Mockito.mock(ReportingContext.class);
final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
Mockito.when(context.getBulletinRepository()).thenReturn(repository);
Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
// creating reporting task
final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
// settings properties and mocking access to properties
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// setup the mock initialization context
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
task.onTrigger(context);
// test checking
assertEquals(1, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0);
assertEquals("message", bulletinJson.getString("bulletinMessage"));
}
@Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
bulletins.add(BulletinFactory.createBulletin("category", "severity", "message"));
// mock the access to the list of bulletins
final ReportingContext context = Mockito.mock(ReportingContext.class);
final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
Mockito.when(context.getBulletinRepository()).thenReturn(repository);
Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
final long maxEventId = getMaxBulletinId(bulletins);;
// create the mock reporting task and mock state manager
final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
final MockStateManager stateManager = new MockStateManager(task);
// settings properties and mocking access to properties
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// create the state map and set the last id to the same value as maxEventId
final Map<String,String> state = new HashMap<>();
state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
stateManager.setState(state, Scope.LOCAL);
// setup the mock reporting context to return the mock state manager
Mockito.when(context.getStateManager()).thenReturn(stateManager);
// setup the mock initialization context
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
// execute the reporting task and should not produce any data b/c max id same as previous id
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final byte[] data = invocation.getArgumentAt(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMapOf(String.class, String.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
return client;
}
}
private Long getMaxBulletinId(List<Bulletin> bulletins) {
long result = -1L;
for (Bulletin bulletin : bulletins) {
if(bulletin.getId() > result) {
result = bulletin.getId();
}
}
return result;
}
}