mirror of https://github.com/apache/nifi.git
NIFI-1420 Fixing minor bugs in GetSplunk
- Adding a Time Zone property so the Managed time ranges use the provided time zone when formatting the date strings - Adding a Time Field Strategy property to choose between searching event time or index time - Making the next iteration use previousLastTime + 1 ms to avoid overlap - Fixing bug where GetSplunk incorrectly cleared state on a restart of NiFi - This closes #299
This commit is contained in:
parent
bbbc77707f
commit
423b333b71
|
@ -54,6 +54,7 @@ import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -64,6 +65,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TimeZone;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@TriggerSerially
|
@TriggerSerially
|
||||||
|
@ -113,6 +115,19 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final AllowableValue EVENT_TIME_VALUE = new AllowableValue("Event Time", "Event Time",
|
||||||
|
"Search based on the time of the event which may be different than when the event was indexed.");
|
||||||
|
public static final AllowableValue INDEX_TIME_VALUE = new AllowableValue("Index Time", "Index Time",
|
||||||
|
"Search based on the time the event was indexed in Splunk.");
|
||||||
|
|
||||||
|
public static final PropertyDescriptor TIME_FIELD_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
|
.name("Time Field Strategy")
|
||||||
|
.description("Indicates whether to search by the time attached to the event, or by the time the event was indexed in Splunk.")
|
||||||
|
.allowableValues(EVENT_TIME_VALUE, INDEX_TIME_VALUE)
|
||||||
|
.defaultValue(EVENT_TIME_VALUE.getValue())
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning",
|
public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning",
|
||||||
"The processor will manage the date ranges of the query starting from the beginning of time.");
|
"The processor will manage the date ranges of the query starting from the beginning of time.");
|
||||||
public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current",
|
public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current",
|
||||||
|
@ -147,6 +162,13 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor TIME_ZONE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Time Zone")
|
||||||
|
.description("The Time Zone to use for formatting dates when performing a search. Only used with Managed time strategies.")
|
||||||
|
.allowableValues(TimeZone.getAvailableIDs())
|
||||||
|
.defaultValue("UTC")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor APP = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor APP = new PropertyDescriptor.Builder()
|
||||||
.name("Application")
|
.name("Application")
|
||||||
.description("The Splunk Application to query.")
|
.description("The Splunk Application to query.")
|
||||||
|
@ -213,7 +235,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
.description("Results retrieved from Splunk are sent out this relationship.")
|
.description("Results retrieved from Splunk are sent out this relationship.")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
|
||||||
public static final String EARLIEST_TIME_KEY = "earliestTime";
|
public static final String EARLIEST_TIME_KEY = "earliestTime";
|
||||||
public static final String LATEST_TIME_KEY = "latestTime";
|
public static final String LATEST_TIME_KEY = "latestTime";
|
||||||
|
|
||||||
|
@ -236,9 +258,11 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
descriptors.add(HOSTNAME);
|
descriptors.add(HOSTNAME);
|
||||||
descriptors.add(PORT);
|
descriptors.add(PORT);
|
||||||
descriptors.add(QUERY);
|
descriptors.add(QUERY);
|
||||||
|
descriptors.add(TIME_FIELD_STRATEGY);
|
||||||
descriptors.add(TIME_RANGE_STRATEGY);
|
descriptors.add(TIME_RANGE_STRATEGY);
|
||||||
descriptors.add(EARLIEST_TIME);
|
descriptors.add(EARLIEST_TIME);
|
||||||
descriptors.add(LATEST_TIME);
|
descriptors.add(LATEST_TIME);
|
||||||
|
descriptors.add(TIME_ZONE);
|
||||||
descriptors.add(APP);
|
descriptors.add(APP);
|
||||||
descriptors.add(OWNER);
|
descriptors.add(OWNER);
|
||||||
descriptors.add(TOKEN);
|
descriptors.add(TOKEN);
|
||||||
|
@ -290,13 +314,16 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||||
if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null))
|
if ( ((oldValue != null && !oldValue.equals(newValue)))
|
||||||
&& (descriptor.equals(QUERY)
|
&& (descriptor.equals(QUERY)
|
||||||
|
|| descriptor.equals(TIME_FIELD_STRATEGY)
|
||||||
|| descriptor.equals(TIME_RANGE_STRATEGY)
|
|| descriptor.equals(TIME_RANGE_STRATEGY)
|
||||||
|| descriptor.equals(EARLIEST_TIME)
|
|| descriptor.equals(EARLIEST_TIME)
|
||||||
|| descriptor.equals(LATEST_TIME)
|
|| descriptor.equals(LATEST_TIME)
|
||||||
|| descriptor.equals(HOSTNAME))
|
|| descriptor.equals(HOSTNAME))
|
||||||
) {
|
) {
|
||||||
|
getLogger().debug("A property that require resetting state was modified - {} oldValue {} newValue {}",
|
||||||
|
new Object[] {descriptor.getDisplayName(), oldValue, newValue});
|
||||||
resetState = true;
|
resetState = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,6 +338,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
// if properties changed since last execution then remove any previous state
|
// if properties changed since last execution then remove any previous state
|
||||||
if (resetState) {
|
if (resetState) {
|
||||||
try {
|
try {
|
||||||
|
getLogger().debug("Clearing state based on property modifications");
|
||||||
context.getStateManager().clear(Scope.CLUSTER);
|
context.getStateManager().clear(Scope.CLUSTER);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Failed to clear state", ioe);
|
getLogger().warn("Failed to clear state", ioe);
|
||||||
|
@ -351,6 +379,8 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
final String query = context.getProperty(QUERY).getValue();
|
final String query = context.getProperty(QUERY).getValue();
|
||||||
final String outputMode = context.getProperty(OUTPUT_MODE).getValue();
|
final String outputMode = context.getProperty(OUTPUT_MODE).getValue();
|
||||||
final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue();
|
final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue();
|
||||||
|
final String timeZone = context.getProperty(TIME_ZONE).getValue();
|
||||||
|
final String timeFieldStrategy = context.getProperty(TIME_FIELD_STRATEGY).getValue();
|
||||||
|
|
||||||
final JobExportArgs exportArgs = new JobExportArgs();
|
final JobExportArgs exportArgs = new JobExportArgs();
|
||||||
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
|
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
|
||||||
|
@ -368,6 +398,7 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
// not provided so we need to check the previous state
|
// not provided so we need to check the previous state
|
||||||
final TimeRange previousRange = loadState(context.getStateManager());
|
final TimeRange previousRange = loadState(context.getStateManager());
|
||||||
final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
|
final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
|
||||||
|
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
|
||||||
|
|
||||||
if (previousRange == null) {
|
if (previousRange == null) {
|
||||||
// no previous state so set the earliest time based on the strategy
|
// no previous state so set the earliest time based on the strategy
|
||||||
|
@ -386,9 +417,16 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// we have previous state so set earliestTime to latestTime of last range
|
// we have previous state so set earliestTime to (latestTime + 1) of last range
|
||||||
earliestTime = previousRange.getLatestTime();
|
try {
|
||||||
latestTime = dateFormat.format(new Date(currentTime));
|
final String previousLastTime = previousRange.getLatestTime();
|
||||||
|
final Date previousLastDate = dateFormat.parse(previousLastTime);
|
||||||
|
|
||||||
|
earliestTime = dateFormat.format(new Date(previousLastDate.getTime() + 1));
|
||||||
|
latestTime = dateFormat.format(new Date(currentTime));
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw new ProcessException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -399,14 +437,26 @@ public class GetSplunk extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!StringUtils.isBlank(earliestTime)) {
|
if (!StringUtils.isBlank(earliestTime)) {
|
||||||
exportArgs.setEarliestTime(earliestTime);
|
if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
|
||||||
|
exportArgs.setEarliestTime(earliestTime);
|
||||||
|
} else {
|
||||||
|
exportArgs.setIndexEarliest(earliestTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!StringUtils.isBlank(latestTime)) {
|
if (!StringUtils.isBlank(latestTime)) {
|
||||||
exportArgs.setLatestTime(latestTime);
|
if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
|
||||||
|
exportArgs.setLatestTime(latestTime);
|
||||||
|
} else {
|
||||||
|
exportArgs.setIndexLatest(latestTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime});
|
if (EVENT_TIME_VALUE.getValue().equalsIgnoreCase(timeFieldStrategy)) {
|
||||||
|
getLogger().debug("Using earliest_time of {} and latest_time of {}", new Object[]{earliestTime, latestTime});
|
||||||
|
} else {
|
||||||
|
getLogger().debug("Using index_earliest of {} and index_latest of {}", new Object[]{earliestTime, latestTime});
|
||||||
|
}
|
||||||
|
|
||||||
final InputStream exportSearch = splunkService.export(query, exportArgs);
|
final InputStream exportSearch = splunkService.export(query, exportArgs);
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,11 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.argThat;
|
import static org.mockito.Mockito.argThat;
|
||||||
|
@ -149,7 +153,7 @@ public class TestGetSplunk {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetWithManagedFromBeginning() {
|
public void testGetWithManagedFromBeginning() throws ParseException {
|
||||||
final String query = "search tcp:7879";
|
final String query = "search tcp:7879";
|
||||||
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
||||||
|
|
||||||
|
@ -176,7 +180,13 @@ public class TestGetSplunk {
|
||||||
Assert.assertNotNull(actualArgs1.get("latest_time"));
|
Assert.assertNotNull(actualArgs1.get("latest_time"));
|
||||||
|
|
||||||
// save the latest time from the first run which should be earliest time of next run
|
// save the latest time from the first run which should be earliest time of next run
|
||||||
final String expectedLatest = (String) actualArgs1.get("latest_time");
|
final String lastLatest = (String) actualArgs1.get("latest_time");
|
||||||
|
|
||||||
|
final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
|
||||||
|
format.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||||
|
|
||||||
|
final Date lastLatestDate = format.parse(lastLatest);
|
||||||
|
final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
|
||||||
|
|
||||||
// run again
|
// run again
|
||||||
runner.run(1, false);
|
runner.run(1, false);
|
||||||
|
@ -193,7 +203,109 @@ public class TestGetSplunk {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetWithManagedFromCurrent() throws IOException {
|
public void testGetWithManagedFromBeginningWithDifferentTimeZone() throws ParseException {
|
||||||
|
final String query = "search tcp:7879";
|
||||||
|
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
||||||
|
final TimeZone timeZone = TimeZone.getTimeZone("PST");
|
||||||
|
|
||||||
|
runner.setProperty(GetSplunk.QUERY, query);
|
||||||
|
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
|
||||||
|
runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
|
||||||
|
runner.setProperty(GetSplunk.TIME_ZONE, timeZone.getID());
|
||||||
|
|
||||||
|
final String resultContent = "fake results";
|
||||||
|
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
|
||||||
|
when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
|
||||||
|
|
||||||
|
// run once and don't shut down
|
||||||
|
runner.run(1, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
// capture what the args were on last run
|
||||||
|
final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
|
||||||
|
verify(service, times(1)).export(eq(query), capture1.capture());
|
||||||
|
|
||||||
|
// first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
|
||||||
|
final JobExportArgs actualArgs1 = capture1.getValue();
|
||||||
|
Assert.assertNotNull(actualArgs1);
|
||||||
|
Assert.assertNull(actualArgs1.get("earliest_time"));
|
||||||
|
Assert.assertNotNull(actualArgs1.get("latest_time"));
|
||||||
|
|
||||||
|
// save the latest time from the first run which should be earliest time of next run
|
||||||
|
final String lastLatest = (String) actualArgs1.get("latest_time");
|
||||||
|
|
||||||
|
final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
|
||||||
|
format.setTimeZone(timeZone);
|
||||||
|
|
||||||
|
final Date lastLatestDate = format.parse(lastLatest);
|
||||||
|
final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
|
||||||
|
|
||||||
|
// run again
|
||||||
|
runner.run(1, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
|
||||||
|
|
||||||
|
final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
|
||||||
|
verify(service, times(2)).export(eq(query), capture2.capture());
|
||||||
|
|
||||||
|
// second execution the earliest time should be the previous latest_time
|
||||||
|
final JobExportArgs actualArgs2 = capture2.getValue();
|
||||||
|
Assert.assertNotNull(actualArgs2);
|
||||||
|
Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
|
||||||
|
Assert.assertNotNull(actualArgs2.get("latest_time"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetWithManagedFromBeginningWithShutdown() throws ParseException {
|
||||||
|
final String query = "search tcp:7879";
|
||||||
|
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
||||||
|
|
||||||
|
runner.setProperty(GetSplunk.QUERY, query);
|
||||||
|
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
|
||||||
|
runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue());
|
||||||
|
|
||||||
|
final String resultContent = "fake results";
|
||||||
|
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
|
||||||
|
when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
|
||||||
|
|
||||||
|
// run once and shut down
|
||||||
|
runner.run(1, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
// capture what the args were on last run
|
||||||
|
final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class);
|
||||||
|
verify(service, times(1)).export(eq(query), capture1.capture());
|
||||||
|
|
||||||
|
// first execution with no previous state and "managed from beginning" should have a latest time and no earliest time
|
||||||
|
final JobExportArgs actualArgs1 = capture1.getValue();
|
||||||
|
Assert.assertNotNull(actualArgs1);
|
||||||
|
Assert.assertNull(actualArgs1.get("earliest_time"));
|
||||||
|
Assert.assertNotNull(actualArgs1.get("latest_time"));
|
||||||
|
|
||||||
|
// save the latest time from the first run which should be earliest time of next run
|
||||||
|
final String lastLatest = (String) actualArgs1.get("latest_time");
|
||||||
|
|
||||||
|
final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
|
||||||
|
format.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||||
|
|
||||||
|
final Date lastLatestDate = format.parse(lastLatest);
|
||||||
|
final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
|
||||||
|
|
||||||
|
// run again
|
||||||
|
runner.run(1, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2);
|
||||||
|
|
||||||
|
final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class);
|
||||||
|
verify(service, times(2)).export(eq(query), capture2.capture());
|
||||||
|
|
||||||
|
// second execution the earliest time should be the previous latest_time
|
||||||
|
final JobExportArgs actualArgs2 = capture2.getValue();
|
||||||
|
Assert.assertNotNull(actualArgs2);
|
||||||
|
Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time"));
|
||||||
|
Assert.assertNotNull(actualArgs2.get("latest_time"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetWithManagedFromCurrentUsingEventTime() throws IOException, ParseException {
|
||||||
final String query = "search tcp:7879";
|
final String query = "search tcp:7879";
|
||||||
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
||||||
|
|
||||||
|
@ -217,7 +329,13 @@ public class TestGetSplunk {
|
||||||
Assert.assertTrue(state.getVersion() > 0);
|
Assert.assertTrue(state.getVersion() > 0);
|
||||||
|
|
||||||
// save the latest time from the first run which should be earliest time of next run
|
// save the latest time from the first run which should be earliest time of next run
|
||||||
final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY);
|
final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY);
|
||||||
|
|
||||||
|
final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
|
||||||
|
format.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||||
|
|
||||||
|
final Date lastLatestDate = format.parse(lastLatest);
|
||||||
|
final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
|
||||||
|
|
||||||
// run again
|
// run again
|
||||||
runner.run(1, false);
|
runner.run(1, false);
|
||||||
|
@ -233,6 +351,55 @@ public class TestGetSplunk {
|
||||||
Assert.assertNotNull(actualArgs.get("latest_time"));
|
Assert.assertNotNull(actualArgs.get("latest_time"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetWithManagedFromCurrentUsingIndexTime() throws IOException, ParseException {
|
||||||
|
final String query = "search tcp:7879";
|
||||||
|
final String outputMode = GetSplunk.ATOM_VALUE.getValue();
|
||||||
|
|
||||||
|
runner.setProperty(GetSplunk.QUERY, query);
|
||||||
|
runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode);
|
||||||
|
runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue());
|
||||||
|
runner.setProperty(GetSplunk.TIME_FIELD_STRATEGY, GetSplunk.INDEX_TIME_VALUE.getValue());
|
||||||
|
|
||||||
|
final String resultContent = "fake results";
|
||||||
|
final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8));
|
||||||
|
when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input);
|
||||||
|
|
||||||
|
// run once and don't shut down, shouldn't produce any results first time
|
||||||
|
runner.run(1, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
// capture what the args were on last run
|
||||||
|
verify(service, times(0)).export(eq(query), any(JobExportArgs.class));
|
||||||
|
|
||||||
|
final StateMap state = runner.getStateManager().getState(Scope.CLUSTER);
|
||||||
|
Assert.assertNotNull(state);
|
||||||
|
Assert.assertTrue(state.getVersion() > 0);
|
||||||
|
|
||||||
|
// save the latest time from the first run which should be earliest time of next run
|
||||||
|
final String lastLatest = state.get(GetSplunk.LATEST_TIME_KEY);
|
||||||
|
|
||||||
|
final SimpleDateFormat format = new SimpleDateFormat(GetSplunk.DATE_TIME_FORMAT);
|
||||||
|
format.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||||
|
|
||||||
|
final Date lastLatestDate = format.parse(lastLatest);
|
||||||
|
final String expectedLatest = format.format(new Date(lastLatestDate.getTime() + 1));
|
||||||
|
|
||||||
|
// run again
|
||||||
|
runner.run(1, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
final ArgumentCaptor<JobExportArgs> capture = ArgumentCaptor.forClass(JobExportArgs.class);
|
||||||
|
verify(service, times(1)).export(eq(query), capture.capture());
|
||||||
|
|
||||||
|
// second execution the earliest time should be the previous latest_time
|
||||||
|
final JobExportArgs actualArgs = capture.getValue();
|
||||||
|
Assert.assertNotNull(actualArgs);
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedLatest, actualArgs.get("index_earliest"));
|
||||||
|
Assert.assertNotNull(actualArgs.get("index_latest"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testable implementation of GetSplunk to return a Mock Splunk Service.
|
* Testable implementation of GetSplunk to return a Mock Splunk Service.
|
||||||
|
|
Loading…
Reference in New Issue