mirror of https://github.com/apache/nifi.git
NIFI-10628: Added Default Schedule Time and Run Duration to processors
This closes #6512 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
3e9b7e27a5
commit
feb3148b68
|
@ -50,10 +50,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestRuntimeManifest {
|
||||
class TestRuntimeManifest {
|
||||
|
||||
public static final String LIST_HDFS_DEFAULT_SCHEDULE_TIME = "1 min";
|
||||
|
||||
@Test
|
||||
public void testRuntimeManifest() throws IOException {
|
||||
void testRuntimeManifest() throws IOException {
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
final RuntimeManifest runtimeManifest;
|
||||
|
@ -127,7 +129,7 @@ public class TestRuntimeManifest {
|
|||
final Map<String, String> listHdfsDefaultSchedulingPeriods = listHdfsDefinition.getDefaultSchedulingPeriodBySchedulingStrategy();
|
||||
assertNotNull(listHdfsDefaultSchedulingPeriods);
|
||||
assertEquals(2, listHdfsDefaultSchedulingPeriods.size());
|
||||
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultSchedulingPeriod(), listHdfsDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals("1 min", listHdfsDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), listHdfsDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
|
||||
|
||||
final List<Relationship> relationships = listHdfsDefinition.getSupportedRelationships();
|
||||
|
@ -197,7 +199,7 @@ public class TestRuntimeManifest {
|
|||
assertNotNull(ambariDefaultSchedulingPeriods);
|
||||
assertEquals(2, ambariDefaultSchedulingPeriods.size());
|
||||
// TIMER_DRIVEN period should come from the @DefaultSchedule annotation that overrides the default value
|
||||
assertEquals("1 min", ambariDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals(LIST_HDFS_DEFAULT_SCHEDULE_TIME, ambariDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), ambariDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
|
||||
|
||||
// Verify JoltTransformRecord which has @EventDriven
|
||||
|
@ -223,7 +225,7 @@ public class TestRuntimeManifest {
|
|||
final Map<String, String> joltTransformDefaultSchedulingPeriods = listHdfsDefinition.getDefaultSchedulingPeriodBySchedulingStrategy();
|
||||
assertNotNull(joltTransformDefaultSchedulingPeriods);
|
||||
assertEquals(2, joltTransformDefaultSchedulingPeriods.size());
|
||||
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultSchedulingPeriod(), joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals(LIST_HDFS_DEFAULT_SCHEDULE_TIME, joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
|
||||
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), joltTransformDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.accumulo.data.KeySchema;
|
|||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -79,6 +81,7 @@ import java.util.concurrent.atomic.LongAdder;
|
|||
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
|
||||
@CapabilityDescription("Scan the given table and writes result in a flowfile. Value will be represented as UTF-8 Encoded String.")
|
||||
@Tags({"hadoop", "accumulo", "scan", "record"})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
/**
|
||||
* Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
|
||||
* simply extends BaseAccumuloProcessor to scan accumulo based on aspects and expression executed against
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSettings;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever;
|
|||
import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters;
|
||||
import org.apache.nifi.processors.airtable.service.AirtableRestService;
|
||||
import org.apache.nifi.processors.airtable.service.RateLimitExceededException;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
||||
@PrimaryNodeOnly
|
||||
|
@ -92,6 +94,7 @@ import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
|||
+ "FlowFiles were produced"),
|
||||
})
|
||||
@DefaultSettings(yieldDuration = "15 sec")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QueryAirtableTable extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -69,6 +70,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.processor.util.list.ListableEntityWrapper;
|
||||
import org.apache.nifi.processor.util.list.ListedEntity;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -102,6 +104,7 @@ import java.util.stream.Collectors;
|
|||
@TriggerWhenEmpty
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"Amazon", "S3", "AWS", "list"})
|
||||
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
|
||||
@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents "
|
||||
+ "the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only "
|
||||
+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
|
||||
|
@ -123,7 +126,7 @@ import java.util.stream.Collectors;
|
|||
"will be written as part of the flowfile attributes"),
|
||||
@WritesAttribute(attribute = "s3.user.metadata.___", description = "If 'Write User Metadata' is set to 'True', the user defined metadata associated to the S3 object that is being listed " +
|
||||
"will be written as part of the flowfile attributes")})
|
||||
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
||||
|
||||
public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -61,6 +62,7 @@ import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
|||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.util.Optional;
|
||||
|
@ -87,6 +89,7 @@ import java.util.Optional;
|
|||
"This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " +
|
||||
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
|
||||
"where the previous node left off, without duplicating the data.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListAzureBlobStorage extends AbstractListAzureProcessor<BlobInfo> {
|
||||
|
||||
private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
|||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -98,6 +100,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
|
|||
"(by default). This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " +
|
||||
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
|
||||
"where the previous node left off, without duplicating the data.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInfo> {
|
||||
|
||||
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
|
||||
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
|
||||
|
@ -103,6 +105,7 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
|
|||
"This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " +
|
||||
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
|
||||
"where the previous node left off, without duplicating the data.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFileInfo> {
|
||||
|
||||
public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -71,6 +73,7 @@ import java.util.stream.Collectors;
|
|||
@WritesAttribute(attribute = BoxFileInfo.TIMESTAMP, description = "The last modified time of the file.")})
|
||||
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already." +
|
||||
" What exactly needs to be stored depends on the 'Listing Strategy'.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListBoxFile extends AbstractListProcessor<BoxFileInfo> {
|
||||
public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
|
||||
.name("box-folder-id")
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
|||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
@PrimaryNodeOnly
|
||||
|
@ -76,6 +78,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
|||
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already. " +
|
||||
"What exactly needs to be stored depends on the 'Listing Strategy'.")
|
||||
@SeeAlso(FetchDropbox.class)
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implements DropboxTrait {
|
||||
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
|
||||
.name("folder")
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
|||
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
|
||||
import org.apache.nifi.processors.gcp.util.GoogleUtils;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -86,6 +88,7 @@ import java.util.concurrent.TimeUnit;
|
|||
" What exactly needs to be stored depends on the 'Listing Strategy'." +
|
||||
" State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up" +
|
||||
" where the previous node left off, without duplicating the data.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> implements GoogleDriveTrait {
|
||||
public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
|
||||
.name("folder-id")
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.processor.util.list.ListableEntityWrapper;
|
||||
import org.apache.nifi.processor.util.list.ListedEntity;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -165,6 +167,7 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
|
|||
@WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC),
|
||||
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
|
||||
})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListGCSBucket extends AbstractGCSProcessor {
|
||||
public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
|
||||
"This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -85,6 +87,7 @@ import java.util.regex.Pattern;
|
|||
@TriggerWhenEmpty
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
|
||||
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
|
||||
@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a listing is performed, the files with the latest timestamp will be excluded "
|
||||
+ "and picked up during the next execution of the processor. This is done to ensure that we do not miss any files, or produce duplicates, in the "
|
||||
+ "cases where files with the same timestamp are written immediately before and after a single execution of the processor. For each file that is "
|
||||
|
@ -109,7 +112,7 @@ import java.util.regex.Pattern;
|
|||
+ "this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance "
|
||||
+ "problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
|
||||
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
|
||||
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListHDFS extends AbstractHadoopProcessor {
|
||||
|
||||
private static final RecordSchema RECORD_SCHEMA;
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
|
@ -71,6 +72,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@TriggerWhenEmpty
|
||||
@TriggerSerially
|
||||
|
@ -87,6 +89,7 @@ import java.util.regex.Pattern;
|
|||
@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) "
|
||||
+ "and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is "
|
||||
+ "run on Primary Node only and the Primary Node changes.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetHBase extends AbstractProcessor implements VisibilityFetchSupport {
|
||||
|
||||
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|||
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
|
||||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSettings;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.HttpResponseStatus;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
@ -79,6 +81,7 @@ import java.util.stream.Collectors;
|
|||
" executing a request. Only the objects after the paging cursor will be retrieved. The maximum number of retrieved" +
|
||||
" objects can be set in the 'Result Limit' property.")
|
||||
@DefaultSettings(yieldDuration = "10 sec")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetHubSpot extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({ "Ignite", "get", "read", "cache", "key" })
|
||||
@SeeAlso({PutIgniteCache.class})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Get the byte array from Ignite Cache and adds it as the content of a FlowFile." +
|
||||
"The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key lookup. " +
|
||||
|
@ -53,7 +54,6 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, description = "The reason for getting entry from cache"),
|
||||
})
|
||||
@SeeAlso({PutIgniteCache.class})
|
||||
public class GetIgniteCache extends AbstractIgniteCacheProcessor {
|
||||
|
||||
/** Flow file attribute keys and messages */
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
|
|||
import org.apache.nifi.processors.mqtt.common.MqttCallback;
|
||||
import org.apache.nifi.processors.mqtt.common.MqttException;
|
||||
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
|
@ -104,7 +106,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
|
|||
"on the topic.")})
|
||||
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single "
|
||||
+ "instance of this processor. A high value for this property could represent a lot of data being stored in memory.")
|
||||
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
|
||||
|
||||
public final static String RECORD_COUNT_KEY = "record.count";
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
|
||||
import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
|
@ -96,6 +98,7 @@ import java.util.function.BiPredicate;
|
|||
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
|
||||
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
|
||||
})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QuerySalesforceObject extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|||
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
|
||||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.nifi.processors.shopify.model.ResourceType;
|
|||
import org.apache.nifi.processors.shopify.model.ShopifyResource;
|
||||
import org.apache.nifi.processors.shopify.rest.ShopifyRestService;
|
||||
import org.apache.nifi.processors.shopify.util.IncrementalTimers;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.HttpResponseStatus;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
@ -75,6 +77,7 @@ import java.util.stream.Collectors;
|
|||
@Stateful(scopes = Scope.CLUSTER, description = "For a few resources the processor supports incremental loading." +
|
||||
" The list of the resources with the supported parameters can be found in the additional details.")
|
||||
@CapabilityDescription("Retrieves objects from a custom Shopify store. The processor yield time must be set to the account's rate limit accordingly.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetShopify extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor STORE_DOMAIN = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -71,6 +72,7 @@ import org.apache.nifi.processor.DataUnit;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.services.smb.SmbClientProviderService;
|
||||
import org.apache.nifi.services.smb.SmbClientService;
|
||||
|
@ -114,6 +116,7 @@ import org.apache.nifi.services.smb.SmbListableEntity;
|
|||
"After performing a listing of files, the state of the previous listing can be stored in order to list files "
|
||||
+ "continuously without duplication."
|
||||
)
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
|
||||
|
||||
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.nifi.processors.solr;
|
|||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -93,6 +95,7 @@ import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
|
|||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
|
||||
@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetSolr extends SolrProcessor {
|
||||
|
||||
public static final String STATE_MANAGER_FILTER = "stateManager_filter";
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
|
|||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -111,6 +113,7 @@ import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
|
|||
@WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"),
|
||||
@WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails")
|
||||
})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QuerySolr extends SolrProcessor {
|
||||
|
||||
public static final AllowableValue MODE_XML = new AllowableValue("XML");
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
|
@ -67,6 +68,7 @@ import java.util.Set;
|
|||
import java.util.TimeZone;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@TriggerSerially
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
|
@ -80,6 +82,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
@Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " +
|
||||
"store the values of the latest and earliest times from the previous execution so that the next execution of the " +
|
||||
"can pick up where the last execution left off. The state will be cleared and start over if the query is changed.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetSplunk extends AbstractProcessor {
|
||||
|
||||
public static final String HTTP_SCHEME = "http";
|
||||
|
|
|
@ -21,6 +21,7 @@ import com.splunk.ResponseMessage;
|
|||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -47,6 +48,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"splunk", "logs", "http", "acknowledgement"})
|
||||
|
@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit;
|
|||
@ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
|
||||
@ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
|
||||
@SeeAlso(PutSplunkHTTP.class)
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QuerySplunkIndexingStatus extends SplunkAPICall {
|
||||
private static final String ENDPOINT = "/services/collector/ack";
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -53,7 +54,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@Tags({"hash", "dupe", "duplicate", "dedupe"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. "
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
|
@ -61,7 +62,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@TriggerWhenAnyDestinationAvailable
|
||||
@Tags({"distribute", "load balance", "route", "round robin", "weighted"})
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -41,7 +42,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@SideEffectFree
|
||||
@SeeAlso(JoinEnrichment.class)
|
||||
@WritesAttributes({
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
|
|||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@SupportsBatching
|
||||
@Tags({"test", "random", "generate", "load"})
|
||||
|
@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
|
||||
description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." +
|
||||
" If Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GenerateFlowFile extends AbstractProcessor {
|
||||
|
||||
private final AtomicReference<byte[]> data = new AtomicReference<>();
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -98,7 +99,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
*/
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@Tags({"attributes", "hash"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Hashes together the key/value pairs of several flowfile attributes and adds the hash as a new attribute. "
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
|
@ -92,6 +94,7 @@ import java.util.stream.Stream;
|
|||
+ "This allows the Processor to not re-list tables the next time that the Processor is run. Specifying the refresh interval in the processor properties will "
|
||||
+ "indicate that when the processor detects the interval has elapsed, the state will be reset and tables will be re-listed as a result. "
|
||||
+ "This processor is meant to be run on the primary node only.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListDatabaseTables extends AbstractProcessor {
|
||||
|
||||
// Attribute names
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.nifi.processors.standard.util.FileTransfer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@PrimaryNodeOnly
|
||||
@TriggerSerially
|
||||
|
@ -65,6 +67,7 @@ import java.util.List;
|
|||
+ "This allows the Processor to list only files that have been added or modified after "
|
||||
+ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if "
|
||||
+ "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListFTP extends ListFileTransfer {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
|
||||
|
@ -126,6 +128,7 @@ import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALI
|
|||
+ "This allows the Processor to list only files that have been added or modified after "
|
||||
+ "this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the "
|
||||
+ "<Input Directory Location> property.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListFile extends AbstractListProcessor<FileInfo> {
|
||||
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Input Directory is located on a local disk. State will be stored locally on each node in the cluster.");
|
||||
static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that "
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -48,6 +49,7 @@ import java.util.List;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
@PrimaryNodeOnly
|
||||
@TriggerSerially
|
||||
|
@ -74,6 +76,7 @@ import java.util.stream.Collectors;
|
|||
+ "This allows the Processor to list only files that have been added or modified after "
|
||||
+ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if "
|
||||
+ "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.")
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class ListSFTP extends ListFileTransfer {
|
||||
|
||||
private volatile Predicate<FileInfo> fileFilter;
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
|
|||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -58,7 +59,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@Tags({"attributes", "logging"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Emits attributes of the FlowFile at the specified log level")
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -45,7 +46,7 @@ import java.util.Set;
|
|||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@Tags({"attributes", "logging"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Emits a log message at the specified log level")
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
|
||||
import org.apache.nifi.processors.standard.sql.SqlWriter;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.util.db.JdbcCommon;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -85,6 +87,7 @@ import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
|
|||
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
|
||||
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
|
||||
@PrimaryNodeOnly
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
|
||||
|
||||
public QueryDatabaseTable() {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
|
||||
import org.apache.nifi.processors.standard.sql.SqlWriter;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.util.db.JdbcCommon;
|
||||
|
||||
|
@ -88,6 +90,7 @@ import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
|
|||
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
|
||||
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
|
||||
@PrimaryNodeOnly
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
|
||||
|
||||
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
|
@ -62,7 +63,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
*/
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language", "find", "text", "string", "search", "filter", "detect"})
|
||||
@CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language")
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.Restriction;
|
|||
import org.apache.nifi.annotation.behavior.Stateful;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -50,6 +51,7 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.TailFile.TailFileState.StateKeys;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.stream.io.NullOutputStream;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
|
@ -112,6 +114,7 @@ import static org.apache.nifi.processor.util.StandardValidators.REGULAR_EXPRESSI
|
|||
explanation = "Provides operator the ability to read from any file that NiFi has access to.")
|
||||
}
|
||||
)
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "30 sec")
|
||||
public class TailFile extends AbstractProcessor {
|
||||
|
||||
static final String MAP_PREFIX = "file.";
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.attributes;
|
|||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -78,7 +79,7 @@ import java.util.regex.Pattern;
|
|||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language", "state"})
|
||||
@CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression")
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
|
|||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSettings;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -73,6 +74,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.HttpUriBuilder;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
@ -86,6 +88,7 @@ import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
|||
@Stateful(scopes = CLUSTER, description = "Paging cursor for Zendesk API is stored. Cursor is updated after each successful request.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = RECORD_COUNT_ATTRIBUTE_NAME, description = "The number of records fetched by the processor.")})
|
||||
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
|
||||
public class GetZendesk extends AbstractProcessor {
|
||||
|
||||
static final int HTTP_TOO_MANY_REQUESTS = 429;
|
||||
|
|
Loading…
Reference in New Issue