NIFI-8336: Change BULLETINS table bulletinTimestamp column to Long

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4909.
This commit is contained in:
Matthew Burgess 2021-03-17 13:27:27 -04:00 committed by Pierre Villard
parent 04cd418618
commit e16cc9df46
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 75 additions and 4 deletions

View File

@ -104,8 +104,8 @@ public class BulletinEnumerator implements Enumerator<Object> {
nodeId,
bulletin.getSourceId(),
bulletin.getSourceName(),
bulletin.getSourceType().name(),
bulletin.getTimestamp()
bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(),
bulletin.getTimestamp() == null ? null : bulletin.getTimestamp().getTime()
};
// If we want no fields just return null

View File

@ -38,7 +38,6 @@ import org.apache.nifi.reporting.ReportingContext;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -180,7 +179,7 @@ public class BulletinTable extends AbstractTable implements QueryableTable, Tran
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(Date.class)
typeFactory.createJavaType(long.class)
);
relDataType = typeFactory.createStructType(Pair.zip(names, types));

View File

@ -31,6 +31,10 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.record.sink.MockRecordSinkService;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
@ -38,6 +42,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockBulletinRepository;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockPropertyValue;
@ -51,6 +56,7 @@ import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -60,6 +66,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -292,6 +299,32 @@ public class TestQueryNiFiReportingTask {
assertEquals("DROP", row.get("eventType"));
}
@Test
public void testBulletinTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order by bulletinTimestamp asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(13, row.size());
assertNotNull(row.get("bulletinId"));
assertEquals("controller", row.get("bulletinCategory"));
assertEquals("WARN", row.get("bulletinLevel"));
// Validate the second row
row = rows.get(1);
assertEquals("processor", row.get("bulletinCategory"));
assertEquals("INFO", row.get("bulletinLevel"));
// Validate the third row
row = rows.get(2);
assertEquals("controller service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
final ComponentLog logger = mock(ComponentLog.class);
@ -374,9 +407,48 @@ public class TestQueryNiFiReportingTask {
}
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
MockBulletinRepository bulletinRepository = new MockQueryBulletinRepository();
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2"));
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
return reportingTask;
}
private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
}
private static class MockQueryBulletinRepository extends MockBulletinRepository {
List<Bulletin> bulletinList;
public MockQueryBulletinRepository() {
bulletinList = new ArrayList<>();
}
@Override
public void addBulletin(Bulletin bulletin) {
bulletinList.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
if (bulletinQuery.getSourceType().equals(ComponentType.PROCESSOR)) {
return Collections.singletonList(bulletinList.get(1));
} else if (bulletinQuery.getSourceType().equals(ComponentType.CONTROLLER_SERVICE)) {
return Collections.singletonList(bulletinList.get(2));
} else {
return Collections.emptyList();
}
}
@Override
public List<Bulletin> findBulletinsForController() {
return Collections.singletonList(bulletinList.get(0));
}
}
}