NIFI-11544 Fixed REL_FAILURE usage in AbstractIcebergProcessor

This closes #7241.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
krisztina-zsihovszki 2023-05-12 17:09:59 +02:00 committed by Peter Turcsanyi
parent b9f4d02094
commit 3051b69a6c
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
3 changed files with 9 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;
@ -35,14 +36,13 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
import static org.apache.nifi.processors.iceberg.PutIceberg.REL_FAILURE;
/**
* Base Iceberg processor class.
*/
public abstract class AbstractIcebergProcessor extends AbstractProcessor {
static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
.name("catalog-service")
.displayName("Catalog Service")
.description("Specifies the Controller Service to use for handling references to tables metadata files.")
@ -50,13 +50,18 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
.identifiesControllerService(KerberosUserService.class)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();
private volatile KerberosUser kerberosUser;
private volatile UserGroupInformation ugi;

View File

@ -163,11 +163,6 @@ public class PutIceberg extends AbstractIcebergProcessor {
.description("A FlowFile is routed to this relationship after the data ingestion was successful.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
RECORD_READER,
CATALOG,

View File

@ -35,7 +35,7 @@ public class IcebergPartitionedWriter extends PartitionedFanoutWriter<Record> {
private final PartitionKey partitionKey;
private final InternalRecordWrapper wrapper;
IcebergPartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory,
public IcebergPartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory,
FileIO io, long targetFileSize, Schema schema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);