From 423b333b7128db987bac86c3104546468beea852 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 22 Mar 2016 11:14:03 -0400 Subject: [PATCH] NIFI-1420 Fixing minor bugs in GetSplunk - Adding a Time Zone property so the Managed time ranges use the provided time zone when formatting the date strings - Adding a Time Field Strategy property to choose between searching event time or index time - Making the next iteration use previousLastTime + 1 ms to avoid overlap - Fixing bug where GetSplunk incorrectly cleared state on a restart of NiFi - This closes #299 --- .../nifi/processors/splunk/GetSplunk.java | 66 ++++++- .../nifi/processors/splunk/TestGetSplunk.java | 175 +++++++++++++++++- 2 files changed, 229 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java index b9d9e0b6c7..4919e61a2c 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java @@ -54,6 +54,7 @@ import org.apache.nifi.stream.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; @@ -64,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import java.util.concurrent.atomic.AtomicBoolean; @TriggerSerially @@ -113,6 +115,19 @@ public class GetSplunk extends AbstractProcessor { .required(true) .build(); + public static final AllowableValue EVENT_TIME_VALUE = new AllowableValue("Event Time", "Event Time", + "Search based on the time of the event which may be different than when the event was indexed."); + public static final AllowableValue INDEX_TIME_VALUE = new AllowableValue("Index Time", "Index Time", + "Search based on the time the event was indexed in Splunk."); + + public static final PropertyDescriptor TIME_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Time Field Strategy") + .description("Indicates whether to search by the time attached to the event, or by the time the event was indexed in Splunk.") + .allowableValues(EVENT_TIME_VALUE, INDEX_TIME_VALUE) + .defaultValue(EVENT_TIME_VALUE.getValue()) + .required(true) + .build(); + public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning", "The processor will manage the date ranges of the query starting from the beginning of time."); public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current", @@ -147,6 +162,13 @@ public class GetSplunk extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .build(); + public static final PropertyDescriptor TIME_ZONE = new PropertyDescriptor.Builder() + .name("Time Zone") + .description("The Time Zone to use for formatting dates when performing a search. Only used with Managed time strategies.") + .allowableValues(TimeZone.getAvailableIDs()) + .defaultValue("UTC") + .required(true) + .build(); public static final PropertyDescriptor APP = new PropertyDescriptor.Builder() .name("Application") .description("The Splunk Application to query.") @@ -213,7 +235,7 @@ public class GetSplunk extends AbstractProcessor { .description("Results retrieved from Splunk are sent out this relationship.") .build(); - public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; public static final String EARLIEST_TIME_KEY = "earliestTime"; public static final String LATEST_TIME_KEY = "latestTime"; @@ -236,9 +258,11 @@ public class GetSplunk extends AbstractProcessor { descriptors.add(HOSTNAME); descriptors.add(PORT); descriptors.add(QUERY); + descriptors.add(TIME_FIELD_STRATEGY); descriptors.add(TIME_RANGE_STRATEGY); descriptors.add(EARLIEST_TIME); descriptors.add(LATEST_TIME); + descriptors.add(TIME_ZONE); descriptors.add(APP); descriptors.add(OWNER); descriptors.add(TOKEN); @@ -290,13 +314,16 @@ public class GetSplunk extends AbstractProcessor { @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null)) + if ( ((oldValue != null && !oldValue.equals(newValue))) && (descriptor.equals(QUERY) + || descriptor.equals(TIME_FIELD_STRATEGY) || descriptor.equals(TIME_RANGE_STRATEGY) || descriptor.equals(EARLIEST_TIME) || descriptor.equals(LATEST_TIME) || descriptor.equals(HOSTNAME)) ) { + getLogger().debug("A property that require resetting state was modified - {} oldValue {} newValue {}", + new Object[] {descriptor.getDisplayName(), oldValue, newValue}); resetState = true; } } @@ -311,6 +338,7 @@ public class GetSplunk extends AbstractProcessor { // if properties changed since last execution then remove any previous state if (resetState) { try { + getLogger().debug("Clearing state based on property modifications"); context.getStateManager().clear(Scope.CLUSTER); } catch (final IOException ioe) { getLogger().warn("Failed to clear state", ioe); @@ -351,6 +379,8 @@ public class GetSplunk extends AbstractProcessor { final String query = context.getProperty(QUERY).getValue(); final String outputMode = context.getProperty(OUTPUT_MODE).getValue(); final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue(); + final String timeZone = context.getProperty(TIME_ZONE).getValue(); + final String timeFieldStrategy = context.getProperty(TIME_FIELD_STRATEGY).getValue(); final JobExportArgs exportArgs = new JobExportArgs(); exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL); @@ -368,6 +398,7 @@ public class GetSplunk extends AbstractProcessor { // not provided so we need to check the previous state final TimeRange previousRange = loadState(context.getStateManager()); final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); if (previousRange == null) { // no previous state so set the earliest time based on the strategy @@ -386,9 +417,16 @@ public class GetSplunk extends AbstractProcessor { } } else { - // we have previous state so set earliestTime to latestTime of last range - earliestTime = previousRange.getLatestTime(); - latestTime = dateFormat.format(new Date(currentTime)); + // we have previous state so set earliestTime to (latestTime + 1) of last range + try { + final String previousLastTime = previousRange.getLatestTime(); + final Date previousLastDate = dateFormat.parse(previousLastTime); + + earliestTime = dateFormat.format(new Date(previousLastDate.getTime() + 1)); + latestTime = dateFormat.format(new Date(currentTime)); + } catch (ParseException e) { + throw new ProcessException(e); + } } } catch (IOException e) { @@ -399,14 +437,26 @@ public class GetSplunk extends AbstractProcessor { } if (!StringUtils.isBlank(earliestTime)) { - exportArgs.setEarliestTime(earliestTime); + if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) { + exportArgs.setEarliestTime(earliestTime); + } else { + exportArgs.setIndexEarliest(earliestTime); + } } if (!StringUtils.isBlank(latestTime)) { - exportArgs.setLatestTime(latestTime); + if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) { + exportArgs.setLatestTime(latestTime); + } else { + exportArgs.setIndexLatest(latestTime); + } } - getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime}); + if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) { + getLogger().debug("Using earliest_time of {} and latest_time of {}", new Object[]{earliestTime, latestTime}); + } else { + getLogger().debug("Using index_earliest of {} and index_latest of {}", new Object[]{earliestTime, latestTime}); + } final InputStream exportSearch = splunkService.export(query, exportArgs); diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java index 42daab6b6b..5ad78813f9 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java @@ -36,7 +36,11 @@ import org.mockito.Mockito; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; +import java.util.TimeZone; import static org.mockito.Mockito.any; import static org.mockito.Mockito.argThat; @@ -149,7 +153,7 @@ public class TestGetSplunk { } @Test - public void testGetWithManagedFromBeginning() { + public void testGetWithManagedFromBeginning() throws ParseException { final String query = "search tcp:7879"; final String outputMode = GetSplunk.ATOM_VALUE.getValue(); @@ -176,7 +180,13 @@ public class TestGetSplunk { Assert.assertNotNull(actualArgs1.get("latest_time")); // save the latest time from the first run which should be earliest time of next run - final String expectedLatest = (String) actualArgs1.get("latest_time"); + final String lastLatest = (String) actualArgs1.get("latest_time"); + + final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + + final Date lastLatestDate = format.parse(lastLatest); + final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1)); // run again runner.run(1, false); @@ -193,7 +203,109 @@ public class TestGetSplunk { } @Test - public void testGetWithManagedFromCurrent() throws IOException { + public void testGetWithManagedFromBeginningWithDifferentTimeZone() throws ParseException { + final String query = "search tcp:7879"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + final TimeZone timeZone = TimeZone.getTimeZone("PST"); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue()); + runner.setProperty(GetSplunk.TIME_ZONE, timeZone.getID()); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input); + + // run once and don't shut down + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + // capture what the args were on last run + final ArgumentCaptor capture1 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(1)).export(eq(query), capture1.capture()); + + // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time + final JobExportArgs actualArgs1 = capture1.getValue(); + Assert.assertNotNull(actualArgs1); + Assert.assertNull(actualArgs1.get("earliest_time")); + Assert.assertNotNull(actualArgs1.get("latest_time")); + + // save the latest time from the first run which should be earliest time of next run + final String lastLatest = (String) actualArgs1.get("latest_time"); + + final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT); + format.setTimeZone(timeZone); + + final Date lastLatestDate = format.parse(lastLatest); + final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1)); + + // run again + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2); + + final ArgumentCaptor capture2 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(2)).export(eq(query), capture2.capture()); + + // second execution the earliest time should be the previous latest_time + final JobExportArgs actualArgs2 = capture2.getValue(); + Assert.assertNotNull(actualArgs2); + Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time")); + Assert.assertNotNull(actualArgs2.get("latest_time")); + } + + @Test + public void testGetWithManagedFromBeginningWithShutdown() throws ParseException { + final String query = "search tcp:7879"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue()); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input); + + // run once and shut down + runner.run(1, true); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + // capture what the args were on last run + final ArgumentCaptor capture1 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(1)).export(eq(query), capture1.capture()); + + // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time + final JobExportArgs actualArgs1 = capture1.getValue(); + Assert.assertNotNull(actualArgs1); + Assert.assertNull(actualArgs1.get("earliest_time")); + Assert.assertNotNull(actualArgs1.get("latest_time")); + + // save the latest time from the first run which should be earliest time of next run + final String lastLatest = (String) actualArgs1.get("latest_time"); + + final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + + final Date lastLatestDate = format.parse(lastLatest); + final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1)); + + // run again + runner.run(1, true); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2); + + final ArgumentCaptor capture2 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(2)).export(eq(query), capture2.capture()); + + // second execution the earliest time should be the previous latest_time + final JobExportArgs actualArgs2 = capture2.getValue(); + Assert.assertNotNull(actualArgs2); + Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time")); + Assert.assertNotNull(actualArgs2.get("latest_time")); + } + + @Test + public void testGetWithManagedFromCurrentUsingEventTime() throws IOException, ParseException { final String query = "search tcp:7879"; final String outputMode = GetSplunk.ATOM_VALUE.getValue(); @@ -217,7 +329,13 @@ public class TestGetSplunk { Assert.assertTrue(state.getVersion() > 0); // save the latest time from the first run which should be earliest time of next run - final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY); + final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY); + + final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + + final Date lastLatestDate = format.parse(lastLatest); + final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1)); // run again runner.run(1, false); @@ -233,6 +351,55 @@ public class TestGetSplunk { Assert.assertNotNull(actualArgs.get("latest_time")); } + @Test + public void testGetWithManagedFromCurrentUsingIndexTime() throws IOException, ParseException { + final String query = "search tcp:7879"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue()); + runner.setProperty(GetSplunk.TIME_FIELD_STRATEGY, GetSplunk.INDEX_TIME_VALUE.getValue()); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input); + + // run once and don't shut down, shouldn't produce any results first time + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0); + + // capture what the args were on last run + verify(service, times(0)).export(eq(query), any(JobExportArgs.class)); + + final StateMap state = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertNotNull(state); + Assert.assertTrue(state.getVersion() > 0); + + // save the latest time from the first run which should be earliest time of next run + final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY); + + final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + + final Date lastLatestDate = format.parse(lastLatest); + final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1)); + + // run again + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + final ArgumentCaptor capture = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(1)).export(eq(query), capture.capture()); + + // second execution the earliest time should be the previous latest_time + final JobExportArgs actualArgs = capture.getValue(); + Assert.assertNotNull(actualArgs); + + Assert.assertEquals(expectedLatest, actualArgs.get("index_earliest")); + Assert.assertNotNull(actualArgs.get("index_latest")); + } + /** * Testable implementation of GetSplunk to return a Mock Splunk Service.