NIFI-12933 Rearranged properties on GCP processors

Also used current API methods for relationships/properties collections

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8545.
This commit is contained in:
Peter Turcsanyi 2024-03-22 13:13:29 +01:00 committed by Pierre Villard
parent 635eb9ed2a
commit 8eb013a813
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
13 changed files with 146 additions and 157 deletions

View File

@ -85,15 +85,6 @@ public abstract class AbstractGCPProcessor<
return cloudService;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return List.of(PROJECT_ID,
GCP_CREDENTIALS_PROVIDER_SERVICE,
RETRY_COUNT,
PROXY_CONFIGURATION_SERVICE
);
}
@Override
public void migrateProperties(final PropertyConfiguration config) {

View File

@ -39,10 +39,8 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -63,8 +61,7 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
.description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
.build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.DATASET_ATTR)
@ -98,17 +95,7 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.addAll(super.getSupportedPropertyDescriptors());
descriptors.add(DATASET);
descriptors.add(TABLE_NAME);
descriptors.add(IGNORE_UNKNOWN);
return Collections.unmodifiableList(descriptors);
return RELATIONSHIPS;
}
@Override

View File

@ -62,7 +62,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.serialization.RecordReader;
@ -80,7 +79,6 @@ import java.sql.Timestamp;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -88,10 +86,6 @@ import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
@TriggerSerially
@Tags({"google", "google cloud", "bq", "bigquery"})
@ -179,7 +173,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
.defaultValue("false")
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = Stream.of(
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
BIGQUERY_API_ENDPOINT,
@ -190,8 +184,8 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
APPEND_RECORD_COUNT,
RETRY_COUNT,
SKIP_INVALID_ROWS,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
).collect(collectingAndThen(toList(), Collections::unmodifiableList));
PROXY_CONFIGURATION_SERVICE
);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -24,21 +24,11 @@ import io.grpc.ProxyDetector;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.SocketAddress;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfiguration;
public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSubProcessor {
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return List.of(
PROJECT_ID,
PROXY_CONFIGURATION_SERVICE,
GCP_CREDENTIALS_PROVIDER_SERVICE
);
}
protected TransportChannelProvider getTransportChannelProvider(ProcessContext context) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);

View File

@ -95,11 +95,32 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
SUBSCRIPTION,
BATCH_SIZE_THRESHOLD,
API_ENDPOINT,
PROXY_CONFIGURATION_SERVICE
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
private SubscriberStub subscriber = null;
private PullRequest pullRequest;
private final AtomicReference<Exception> storedException = new AtomicReference<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
final Integer batchSize = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
@ -188,20 +209,6 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
}
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(SUBSCRIPTION);
descriptors.add(BATCH_SIZE_THRESHOLD);
descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) {

View File

@ -75,10 +75,8 @@ import org.threeten.bp.Duration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -173,22 +171,29 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
.description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.")
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
TOPIC_NAME,
MESSAGE_DERIVATION_STRATEGY,
RECORD_READER,
RECORD_WRITER,
MAX_BATCH_SIZE,
MAX_MESSAGE_SIZE,
BATCH_SIZE_THRESHOLD,
BATCH_BYTES_THRESHOLD,
BATCH_DELAY_THRESHOLD,
API_ENDPOINT,
PROXY_CONFIGURATION_SERVICE
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY);
protected Publisher publisher = null;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(MAX_BATCH_SIZE);
descriptors.add(MESSAGE_DERIVATION_STRATEGY);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(MAX_MESSAGE_SIZE);
descriptors.add(TOPIC_NAME);
descriptors.add(BATCH_SIZE_THRESHOLD);
descriptors.add(BATCH_BYTES_THRESHOLD);
descriptors.add(BATCH_DELAY_THRESHOLD);
descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors);
return DESCRIPTORS;
}
@Override
@ -205,9 +210,7 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
@Override
public Set<Relationship> getRelationships() {
return Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY))
);
return RELATIONSHIPS;
}
@Override

View File

@ -52,9 +52,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -116,9 +114,28 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
SUBSCRIPTION,
BYTES_OUTSTANDING,
MESSAGES_OUTSTANDING
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
private Subscriber subscriber = null;
private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
@ -168,19 +185,6 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
}
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SUBSCRIPTION,
GCP_CREDENTIALS_PROVIDER_SERVICE,
BYTES_OUTSTANDING,
MESSAGES_OUTSTANDING));
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) {

View File

@ -61,11 +61,8 @@ import org.threeten.bp.Duration;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -112,16 +109,22 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
TOPIC_NAME,
ORDERING_KEY,
BATCH_SIZE_THRESHOLD,
BATCH_BYTES_THRESHOLD,
BATCH_DELAY_THRESHOLD
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
private Publisher publisher = null;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME,
GCP_CREDENTIALS_PROVIDER_SERVICE,
ORDERING_KEY,
BATCH_SIZE_THRESHOLD,
BATCH_BYTES_THRESHOLD,
BATCH_DELAY_THRESHOLD));
return DESCRIPTORS;
}
@Override
@ -138,9 +141,7 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
@Override
public Set<Relationship> getRelationships() {
return Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))
);
return RELATIONSHIPS;
}
@Override

View File

@ -39,10 +39,7 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -62,12 +59,11 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
.description("FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.")
.build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}
// https://cloud.google.com/storage/docs/request-endpoints#storage-set-client-endpoint-java
@ -81,13 +77,6 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
.required(false)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
propertyDescriptors.add(STORAGE_API_URL);
return Collections.unmodifiableList(propertyDescriptors);
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>(verifyCloudService(context, verificationLogger, attributes));

View File

@ -31,7 +31,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -77,14 +76,20 @@ public class DeleteGCSObject extends AbstractGCSProcessor {
.required(false)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
BUCKET,
KEY,
GENERATION,
RETRY_COUNT,
STORAGE_API_URL,
PROXY_CONFIGURATION_SERVICE
);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.addAll(super.getSupportedPropertyDescriptors());
descriptors.add(BUCKET);
descriptors.add(KEY);
descriptors.add(GENERATION);
return Collections.unmodifiableList(descriptors);
return DESCRIPTORS;
}
@Override

View File

@ -170,7 +170,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
public static final PropertyDescriptor KEY = new PropertyDescriptor
.Builder().name("gcs-key")
.displayName("Name")
.displayName("Key")
.description(KEY_DESC)
.required(true)
.defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
@ -217,17 +217,23 @@ public class FetchGCSObject extends AbstractGCSProcessor {
.required(false)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
BUCKET,
KEY,
GENERATION,
ENCRYPTION_KEY,
RANGE_START,
RANGE_LENGTH,
RETRY_COUNT,
STORAGE_API_URL,
PROXY_CONFIGURATION_SERVICE
);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(BUCKET);
descriptors.add(KEY);
descriptors.addAll(super.getSupportedPropertyDescriptors());
descriptors.add(GENERATION);
descriptors.add(ENCRYPTION_KEY);
descriptors.add(RANGE_START);
descriptors.add(RANGE_LENGTH);
return Collections.unmodifiableList(descriptors);
return DESCRIPTORS;
}
@Override

View File

@ -250,27 +250,32 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
BUCKET,
PREFIX,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
INITIAL_LISTING_TARGET,
TRACKING_TIME_WINDOW,
RECORD_WRITER,
USE_GENERATIONS,
RETRY_COUNT,
STORAGE_API_URL,
PROXY_CONFIGURATION_SERVICE
);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(LISTING_STRATEGY);
descriptors.add(TRACKING_STATE_CACHE);
descriptors.add(INITIAL_LISTING_TARGET);
descriptors.add(TRACKING_TIME_WINDOW);
descriptors.add(BUCKET);
descriptors.add(RECORD_WRITER);
descriptors.addAll(super.getSupportedPropertyDescriptors());
descriptors.add(PREFIX);
descriptors.add(USE_GENERATIONS);
return Collections.unmodifiableList(descriptors);
return DESCRIPTORS;
}
private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}
// State tracking

View File

@ -289,21 +289,28 @@ public class PutGCSObject extends AbstractGCSProcessor {
.allowableValues(CD_INLINE, CD_ATTACHMENT)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
BUCKET,
KEY,
RESOURCE_TRANSFER_SOURCE,
FILE_RESOURCE_SERVICE,
CONTENT_TYPE,
CRC32C,
ACL,
ENCRYPTION_KEY,
OVERWRITE,
CONTENT_DISPOSITION_TYPE,
GZIPCONTENT,
STORAGE_API_URL,
RETRY_COUNT,
PROXY_CONFIGURATION_SERVICE
);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(BUCKET);
descriptors.add(KEY);
descriptors.add(RESOURCE_TRANSFER_SOURCE);
descriptors.add(FILE_RESOURCE_SERVICE);
descriptors.add(CONTENT_TYPE);
descriptors.add(CRC32C);
descriptors.add(ACL);
descriptors.add(ENCRYPTION_KEY);
descriptors.add(OVERWRITE);
descriptors.add(CONTENT_DISPOSITION_TYPE);
descriptors.add(GZIPCONTENT);
return Collections.unmodifiableList(descriptors);
return DESCRIPTORS;
}
@Override