NIFI-3739: This closes #1695. Added ConsumeKafkaRecord_0_10 and PublishKafkaRecord_0_10 processors

This commit is contained in:
Mark Payne 2017-04-25 17:19:41 -04:00 committed by joewitt
parent 946f4a1a28
commit 07989b8460
50 changed files with 2582 additions and 308 deletions

View File

@ -149,13 +149,16 @@ public class StandardPropertyValue implements PropertyValue {
}
@Override
@SuppressWarnings("unchecked")
public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map<String, String> additionalAttributes, AttributeValueDecorator decorator, Map<String, String> stateValues)
throws ProcessException {
if (rawValue == null || preparedQuery == null) {
return this;
}
final ValueLookup lookup = new ValueLookup(variableRegistry, flowFile, additionalAttributes);
return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator, stateValues), serviceLookup, null);
final String evaluated = preparedQuery.evaluateExpressions(lookup, decorator, stateValues);
return new StandardPropertyValue(evaluated, serviceLookup, new EmptyPreparedQuery(evaluated), null);
}
@Override

View File

@ -27,10 +27,18 @@ import java.util.Optional;
public class MapRecord implements Record {
private final RecordSchema schema;
private final Map<String, Object> values;
private final Optional<SerializedForm> serializedForm;
public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
this.schema = Objects.requireNonNull(schema);
this.values = Objects.requireNonNull(values);
this.serializedForm = Optional.empty();
}
public MapRecord(final RecordSchema schema, final Map<String, Object> values, final SerializedForm serializedForm) {
this.schema = Objects.requireNonNull(schema);
this.values = Objects.requireNonNull(values);
this.serializedForm = Optional.ofNullable(serializedForm);
}
@Override
@ -144,19 +152,12 @@ public class MapRecord implements Record {
return convertToString(getValue(field), format);
}
private String getFormat(final String optionalFormat, final RecordFieldType fieldType) {
return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat;
}
private String convertToString(final Object value, final String format) {
if (value == null) {
return null;
}
final String dateFormat = getFormat(format, RecordFieldType.DATE);
final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP);
final String timeFormat = getFormat(format, RecordFieldType.TIME);
return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat);
return DataTypeUtils.toString(value, format);
}
@Override
@ -191,7 +192,7 @@ public class MapRecord implements Record {
@Override
public Date getAsDate(final String fieldName, final String format) {
return DataTypeUtils.toDate(getValue(fieldName), format, fieldName);
return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName);
}
@Override
@ -224,4 +225,9 @@ public class MapRecord implements Record {
public String toString() {
return "MapRecord[values=" + values + "]";
}
@Override
public Optional<SerializedForm> getSerializedForm() {
return serializedForm;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;
import java.util.Date;
import java.util.Optional;
public interface Record {
@ -61,4 +62,6 @@ public interface Record {
Date getAsDate(String fieldName, String format);
Object[] getAsArray(String fieldName);
Optional<SerializedForm> getSerializedForm();
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record;
import java.util.Objects;
public interface SerializedForm {
/**
* @return the serialized form of the record. This could be a byte[], String, ByteBuffer, etc.
*/
Object getSerialized();
/**
* @return the MIME type that the data is serialized in
*/
String getMimeType();
public static SerializedForm of(final java.util.function.Supplier<Object> serializedSupplier, final String mimeType) {
Objects.requireNonNull(serializedSupplier);
Objects.requireNonNull(mimeType);
return new SerializedForm() {
private volatile Object serialized = null;
@Override
public Object getSerialized() {
if (serialized != null) {
return serialized;
}
final Object supplied = serializedSupplier.get();
this.serialized = supplied;
return supplied;
}
@Override
public String getMimeType() {
return mimeType;
}
@Override
public int hashCode() {
return 31 + 17 * mimeType.hashCode() + 15 * getSerialized().hashCode();
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof SerializedForm)) {
return false;
}
final SerializedForm other = (SerializedForm) obj;
return other.getMimeType().equals(mimeType) && Objects.deepEquals(other.getSerialized(), getSerialized());
}
};
}
public static SerializedForm of(final Object serialized, final String mimeType) {
Objects.requireNonNull(serialized);
Objects.requireNonNull(mimeType);
return new SerializedForm() {
@Override
public Object getSerialized() {
return serialized;
}
@Override
public String getMimeType() {
return mimeType;
}
@Override
public int hashCode() {
return 31 + 17 * mimeType.hashCode() + 15 * serialized.hashCode();
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof SerializedForm)) {
return false;
}
final SerializedForm other = (SerializedForm) obj;
return other.getMimeType().equals(mimeType) && Objects.deepEquals(other.getSerialized(), getSerialized());
}
};
}
}

View File

@ -43,10 +43,25 @@ public class DataTypeUtils {
private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName);
return convertType(value, dataType, getDateFormat(RecordFieldType.DATE.getDefaultFormat()), getDateFormat(RecordFieldType.TIME.getDefaultFormat()),
getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()), fieldName);
}
public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat, final String fieldName) {
public static DateFormat getDateFormat(final RecordFieldType fieldType, final DateFormat dateFormat, final DateFormat timeFormat, final DateFormat timestampFormat) {
switch (fieldType) {
case DATE:
return dateFormat;
case TIME:
return timeFormat;
case TIMESTAMP:
return timestampFormat;
}
return null;
}
public static Object convertType(final Object value, final DataType dataType, final DateFormat dateFormat, final DateFormat timeFormat,
final DateFormat timestampFormat, final String fieldName) {
switch (dataType.getFieldType()) {
case BIGINT:
return toBigInt(value, fieldName);
@ -69,7 +84,7 @@ public class DataTypeUtils {
case SHORT:
return toShort(value, fieldName);
case STRING:
return toString(value, dateFormat, timeFormat, timestampFormat);
return toString(value, getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat));
case TIME:
return toTime(value, timeFormat, fieldName);
case TIMESTAMP:
@ -273,7 +288,7 @@ public class DataTypeUtils {
}
public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) {
public static String toString(final Object value, final DateFormat format) {
if (value == null) {
return null;
}
@ -282,17 +297,50 @@ public class DataTypeUtils {
return (String) value;
}
if (format == null && value instanceof java.util.Date) {
return String.valueOf(((java.util.Date) value).getTime());
}
if (value instanceof java.sql.Date) {
return getDateFormat(dateFormat).format((java.util.Date) value);
return format.format((java.util.Date) value);
}
if (value instanceof java.sql.Time) {
return getDateFormat(timeFormat).format((java.util.Date) value);
return format.format((java.util.Date) value);
}
if (value instanceof java.sql.Timestamp) {
return getDateFormat(timestampFormat).format((java.util.Date) value);
return format.format((java.util.Date) value);
}
if (value instanceof java.util.Date) {
return getDateFormat(timestampFormat).format((java.util.Date) value);
return format.format((java.util.Date) value);
}
return value.toString();
}
public static String toString(final Object value, final String format) {
if (value == null) {
return null;
}
if (value instanceof String) {
return (String) value;
}
if (format == null && value instanceof java.util.Date) {
return String.valueOf(((java.util.Date) value).getTime());
}
if (value instanceof java.sql.Date) {
return getDateFormat(format).format((java.util.Date) value);
}
if (value instanceof java.sql.Time) {
return getDateFormat(format).format((java.util.Date) value);
}
if (value instanceof java.sql.Timestamp) {
return getDateFormat(format).format((java.util.Date) value);
}
if (value instanceof java.util.Date) {
return getDateFormat(format).format((java.util.Date) value);
}
return value.toString();
@ -302,7 +350,7 @@ public class DataTypeUtils {
return value != null;
}
public static java.sql.Date toDate(final Object value, final String format, final String fieldName) {
public static java.sql.Date toDate(final Object value, final DateFormat format, final String fieldName) {
if (value == null) {
return null;
}
@ -318,9 +366,18 @@ public class DataTypeUtils {
if (value instanceof String) {
try {
final java.util.Date utilDate = getDateFormat(format).parse((String) value);
final String string = ((String) value).trim();
if (string.isEmpty()) {
return null;
}
if (format == null) {
return new Date(Long.parseLong(string));
}
final java.util.Date utilDate = format.parse(string);
return new Date(utilDate.getTime());
} catch (final ParseException e) {
} catch (final ParseException | NumberFormatException e) {
throw new IllegalTypeConversionException("Could not convert value [" + value
+ "] of type java.lang.String to Date because the value is not in the expected date format: " + format + " for field " + fieldName);
}
@ -350,7 +407,7 @@ public class DataTypeUtils {
return false;
}
public static Time toTime(final Object value, final String format, final String fieldName) {
public static Time toTime(final Object value, final DateFormat format, final String fieldName) {
if (value == null) {
return null;
}
@ -366,7 +423,16 @@ public class DataTypeUtils {
if (value instanceof String) {
try {
final java.util.Date utilDate = getDateFormat(format).parse((String) value);
final String string = ((String) value).trim();
if (string.isEmpty()) {
return null;
}
if (format == null) {
return new Time(Long.parseLong(string));
}
final java.util.Date utilDate = format.parse(string);
return new Time(utilDate.getTime());
} catch (final ParseException e) {
throw new IllegalTypeConversionException("Could not convert value [" + value
@ -377,7 +443,7 @@ public class DataTypeUtils {
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName);
}
private static DateFormat getDateFormat(final String format) {
public static DateFormat getDateFormat(final String format) {
final DateFormat df = new SimpleDateFormat(format);
df.setTimeZone(gmt);
return df;
@ -387,7 +453,7 @@ public class DataTypeUtils {
return isDateTypeCompatible(value, format);
}
public static Timestamp toTimestamp(final Object value, final String format, final String fieldName) {
public static Timestamp toTimestamp(final Object value, final DateFormat format, final String fieldName) {
if (value == null) {
return null;
}
@ -403,7 +469,16 @@ public class DataTypeUtils {
if (value instanceof String) {
try {
final java.util.Date utilDate = getDateFormat(format).parse((String) value);
final String string = ((String) value).trim();
if (string.isEmpty()) {
return null;
}
if (format == null) {
return new Timestamp(Long.parseLong(string));
}
final java.util.Date utilDate = format.parse(string);
return new Timestamp(utilDate.getTime());
} catch (final ParseException e) {
throw new IllegalTypeConversionException("Could not convert value [" + value

View File

@ -296,6 +296,8 @@ public class AvroTypeUtil {
}
return map;
} else if (rawValue instanceof Map) {
return rawValue;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
}
@ -358,7 +360,7 @@ public class AvroTypeUtil {
case ENUM:
return new GenericData.EnumSymbol(fieldSchema, rawValue);
case STRING:
return DataTypeUtils.toString(rawValue, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
return DataTypeUtils.toString(rawValue, (String) null);
}
return rawValue;

View File

@ -60,7 +60,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
final int protocolVersion = bb.get();
if (protocolVersion != 1) {
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion);
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
}
final long schemaId = bb.getLong();

View File

@ -18,33 +18,32 @@
package org.apache.nifi.serialization;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.serialization.record.RecordFieldType;
public class DateTimeUtils {
public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("Date Format")
.description("Specifies the format to use when reading/writing Date fields")
.description("Specifies the format to use when reading/writing Date fields. "
+ "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
.expressionLanguageSupported(false)
.defaultValue(RecordFieldType.DATE.getDefaultFormat())
.addValidator(new SimpleDateFormatValidator())
.required(true)
.required(false)
.build();
public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
.name("Time Format")
.description("Specifies the format to use when reading/writing Time fields")
.description("Specifies the format to use when reading/writing Time fields. "
+ "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
.expressionLanguageSupported(false)
.defaultValue(RecordFieldType.TIME.getDefaultFormat())
.addValidator(new SimpleDateFormatValidator())
.required(true)
.required(false)
.build();
public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("Timestamp Format")
.description("Specifies the format to use when reading/writing Timestamp fields")
.description("Specifies the format to use when reading/writing Timestamp fields. "
+ "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
.expressionLanguageSupported(false)
.defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
.addValidator(new SimpleDateFormatValidator())
.required(true)
.required(false)
.build();
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
@ -162,7 +163,7 @@ public interface ProcessScheduler {
*
* @param service to enable
*/
void enableControllerService(ControllerServiceNode service);
CompletableFuture<Void> enableControllerService(ControllerServiceNode service);
/**
* Disables all of the given Controller Services in the order provided by the List

View File

@ -23,6 +23,7 @@ import org.apache.nifi.groups.ProcessGroup;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
public interface ControllerServiceNode extends ConfiguredComponent {
@ -94,8 +95,10 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* initiate service enabling task as well as its re-tries
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
*
* @return a CompletableFuture that can be used to wait for the service to finish enabling
*/
void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis);
CompletableFuture<Void> enable(ScheduledExecutorService scheduler, long administrativeYieldMillis);
/**
* Will disable this service. Disabling of the service typically means

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
import java.net.URL;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.bundle.BundleCoordinate;
@ -65,11 +66,13 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
/**
* Enables the given controller service that it can be used by other
* components
* components. This method will asynchronously enable the service, returning
* immediately.
*
* @param serviceNode the service node
* @return a Future that can be used to wait for the service to finish being enabled.
*/
void enableControllerService(ControllerServiceNode serviceNode);
Future<Void> enableControllerService(ControllerServiceNode serviceNode);
/**
* Enables the collection of services. If a service in this collection

View File

@ -236,6 +236,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -3235,8 +3236,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableControllerService(serviceNode);
public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) {
return controllerServiceProvider.enableControllerService(serviceNode);
}
@Override

View File

@ -125,6 +125,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final AtomicLong schedulingNanos;
private final ProcessScheduler processScheduler;
private long runNanos = 0L;
private volatile long yieldNanos;
private final NiFiProperties nifiProperties;
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
@ -518,7 +519,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public long getYieldPeriod(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
final TimeUnit unit = (timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
return unit.convert(yieldNanos, TimeUnit.NANOSECONDS);
}
@Override
@ -531,11 +533,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
if (yieldMillis < 0) {
final long yieldNanos = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.NANOSECONDS);
if (yieldNanos < 0) {
throw new IllegalArgumentException("Yield duration must be positive");
}
this.yieldPeriod.set(yieldPeriod);
this.yieldNanos = yieldNanos;
}
/**

View File

@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
@ -535,8 +536,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
@Override
public void enableControllerService(final ControllerServiceNode service) {
service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
public CompletableFuture<Void> enableControllerService(final ControllerServiceNode service) {
return service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
}
@Override

View File

@ -54,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -382,7 +383,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* as it reached ENABLED state.
*/
@Override
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
public CompletableFuture<Void> enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
final CompletableFuture<Void> future = new CompletableFuture<>();
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
synchronized (active) {
this.active.set(true);
@ -396,6 +399,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
}
future.complete(null);
boolean shouldEnable = false;
synchronized (active) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
@ -426,6 +432,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
});
}
return future;
}
/**

View File

@ -16,6 +16,21 @@
*/
package org.apache.nifi.controller.service;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnAdded;
@ -51,20 +66,6 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Objects.requireNonNull;
public class StandardControllerServiceProvider implements ControllerServiceProvider {
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
@ -322,9 +323,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
processScheduler.enableControllerService(serviceNode);
return processScheduler.enableControllerService(serviceNode);
}
@Override
@ -349,7 +350,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
this.enableControllerServiceDependenciesFirst(controllerServiceNode);
}
} catch (Exception e) {
logger.error("Failed to enable " + controllerServiceNode + " due to " + e);
logger.error("Failed to enable " + controllerServiceNode, e);
if (this.bulletinRepo != null) {
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
@ -359,16 +360,28 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
private void enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
final List<Future<Void>> futures = new ArrayList<>();
for (ControllerServiceNode depNode : serviceNode.getRequiredControllerServices()) {
if (!depNode.isActive()) {
this.enableControllerServiceDependenciesFirst(depNode);
futures.add(this.enableControllerServiceDependenciesFirst(depNode));
}
}
if (logger.isDebugEnabled()) {
logger.debug("Enabling " + serviceNode);
}
this.enableControllerService(serviceNode);
for (final Future<Void> future : futures) {
try {
future.get();
} catch (final Exception e) {
// Nothing we can really do. Will attempt to enable this service anyway.
}
}
return this.enableControllerService(serviceNode);
}
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {

View File

@ -26,6 +26,14 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>

View File

@ -0,0 +1,346 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_0_10. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
@SeeAlso({ConsumeKafka_0_10.class, PublishKafka_0_10.class, PublishKafkaRecord_0_10.class})
public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
.name("topic_type")
.displayName("Topic Name Format")
.description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
.required(true)
.allowableValues(TOPIC_NAME, TOPIC_PATTERN)
.defaultValue(TOPIC_NAME.getValue())
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name("group.id")
.displayName("Group ID")
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
.name("auto.offset.reset")
.displayName("Offset Reset")
.description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any "
+ "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
.required(true)
.allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
.defaultValue(OFFSET_LATEST.getValue())
.build();
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
.description("Specifies the maximum number of records Kafka should return in a single poll.")
.required(false)
.defaultValue("10000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
.name("max-uncommit-offset-wait")
.displayName("Max Uncommitted Time")
.description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ "than when we're not as there is much less for us to keep track of in memory.")
.required(false)
.defaultValue("1 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
.build();
static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message from Kafka cannot be parsed using the configured Record Reader, the contents of the "
+ "message will be routed to this Relationship as its own individual FlowFile.")
.build();
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
private volatile ConsumerPool consumerPool = null;
private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
static {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
descriptors.add(TOPICS);
descriptors.add(TOPIC_TYPE);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_PARSE_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(rels);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@OnStopped
public void close() {
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
pool.close();
}
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
return KafkaProcessorUtils.validateCommonProperties(validationContext);
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
ConsumerPool pool = consumerPool;
if (pool != null) {
return pool;
}
return consumerPool = createConsumerPool(context, getLogger());
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Map<String, Object> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafkaRecord_0_10.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
}
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol, bootstrapServers, log);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol, bootstrapServers, log);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
}
}
@OnUnscheduled
public void interruptActiveThreads() {
// There are known issues with the Kafka client library that result in the client code hanging
// indefinitely when unable to communicate with the broker. In order to address this, we will wait
// up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
// thread to wakeup when it is blocked, waiting on a response.
final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
final long start = System.nanoTime();
while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
if (!activeLeases.isEmpty()) {
int count = 0;
for (final ConsumerLease lease : activeLeases) {
getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
lease.wakeup();
count++;
}
getLogger().info("Woke up {} consumers", new Object[] {count});
}
activeLeases.clear();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
try (final ConsumerLease lease = pool.obtainConsumer(session, context)) {
if (lease == null) {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}
}

View File

@ -166,9 +166,9 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
.build();
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
.build();
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
@ -305,7 +305,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return;
}
try (final ConsumerLease lease = pool.obtainConsumer(session)) {
try (final ConsumerLease lease = pool.obtainConsumer(session, context)) {
if (lease == null) {
context.yield();
return;

View File

@ -16,15 +16,30 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -34,11 +49,19 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@ -56,6 +79,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private final RecordSetWriterFactory writerFactory;
private final RecordReaderFactory readerFactory;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
@ -72,6 +97,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final RecordReaderFactory readerFactory,
final RecordSetWriterFactory writerFactory,
final ComponentLog logger) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
@ -79,6 +106,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.logger = logger;
}
@ -175,7 +204,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
}
getProcessSession().commit();
kafkaConsumer.commitSync(uncommittedOffsetsMap);
final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap;
kafkaConsumer.commitSync(offsetsMap);
resetInternalState();
return true;
} catch (final KafkaException kex) {
@ -269,8 +300,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
public abstract ProcessSession getProcessSession();
private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
public abstract void yield();
private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
records.partitions().stream().forEach(partition -> {
List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
if (!messages.isEmpty()) {
@ -279,17 +311,20 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
.mapToLong(record -> record.offset())
.max()
.getAsLong();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
//write records to content repository and session
if (demarcatorBytes == null) {
if (demarcatorBytes != null) {
writeDemarcatedData(getProcessSession(), messages, partition);
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
totalFlowFiles += messages.size();
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
} else {
writeData(getProcessSession(), messages, partition);
}
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
}
});
}
@ -329,7 +364,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
session.transfer(tracker.flowFile, REL_SUCCESS);
}
private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final boolean demarcateFirstRecord;
BundleTracker tracker = bundleMap.get(topicPartition);
@ -343,6 +378,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
}
flowFile = tracker.flowFile;
tracker.incrementRecordCount(records.size());
flowFile = session.append(flowFile, out -> {
boolean useDemarcator = demarcateFirstRecord;
@ -358,6 +394,115 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
bundleMap.put(topicPartition, tracker);
}
private void rollback(final TopicPartition topicPartition) {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
try {
final RecordSetWriter writer;
try {
writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new byte[0]));
} catch (final Exception e) {
logger.error(
"Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
+ "Record Writer cannot obtain the appropriate Schema, due to failure to connect to a remote Schema Registry "
+ "or due to the Schema Access Strategy being dependent upon FlowFile Attributes that are not available. "
+ "Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
}
final FlowFile ff = flowFile;
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
flowFile = session.write(flowFile, rawOut -> {
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
final RecordSet recordSet = new RecordSet() {
@Override
public RecordSchema getSchema() throws IOException {
return emptySchema;
}
@Override
public Record next() throws IOException {
while (itr.hasNext()) {
final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
final InputStream in = new ByteArrayInputStream(consumerRecord.value());
try {
final RecordReader reader = readerFactory.createRecordReader(ff, in, logger);
final Record record = reader.nextRecord();
return record;
} catch (final Exception e) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
session.adjustCounter("Parse Failures", 1, false);
}
}
return null;
}
};
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
writeResult.set(writer.write(recordSet, out));
}
});
final WriteResult result = writeResult.get();
if (result.getRecordCount() > 0) {
final Map<String, String> attributes = new HashMap<>(result.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put("record.count", String.valueOf(result.getRecordCount()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
flowFile = session.putAllAttributes(flowFile, attributes);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
session.adjustCounter("Records Received", result.getRecordCount(), false);
session.transfer(flowFile, REL_SUCCESS);
} else {
session.remove(flowFile);
}
} catch (final Exception e) {
session.remove(flowFile);
throw e;
}
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));

View File

@ -16,21 +16,24 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
/**
* A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@ -49,6 +52,8 @@ public class ConsumerPool implements Closeable {
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private final RecordReaderFactory readerFactory;
private final RecordSetWriterFactory writerFactory;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@ -93,6 +98,8 @@ public class ConsumerPool implements Closeable {
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = Collections.unmodifiableList(topics);
this.topicPattern = null;
this.readerFactory = null;
this.writerFactory = null;
}
public ConsumerPool(
@ -115,6 +122,56 @@ public class ConsumerPool implements Closeable {
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = null;
this.topicPattern = topics;
this.readerFactory = null;
this.writerFactory = null;
}
public ConsumerPool(
final int maxConcurrentLeases,
final RecordReaderFactory readerFactory,
final RecordSetWriterFactory writerFactory,
final Map<String, Object> kafkaProperties,
final Pattern topics,
final long maxWaitMillis,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = null;
this.topicPattern = topics;
}
public ConsumerPool(
final int maxConcurrentLeases,
final RecordReaderFactory readerFactory,
final RecordSetWriterFactory writerFactory,
final Map<String, Object> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = topics;
this.topicPattern = null;
}
/**
@ -122,10 +179,12 @@ public class ConsumerPool implements Closeable {
* initializes a new one if deemed necessary.
*
* @param session the session for which the consumer lease will be
* associated
* associated
* @param processContext the ProcessContext for which the consumer
* lease will be associated
* @return consumer to use or null if not available or necessary
*/
public ConsumerLease obtainConsumer(final ProcessSession session) {
public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
SimpleConsumerLease lease = pooledLeases.poll();
if (lease == null) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
@ -150,7 +209,8 @@ public class ConsumerPool implements Closeable {
consumer.subscribe(topicPattern, lease);
}
}
lease.setProcessSession(session);
lease.setProcessSession(session, processContext);
leasesObtainedCountRef.incrementAndGet();
return lease;
}
@ -200,15 +260,24 @@ public class ConsumerPool implements Closeable {
private final Consumer<byte[], byte[]> consumer;
private volatile ProcessSession session;
private volatile ProcessContext processContext;
private volatile boolean closedConsumer;
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, readerFactory, writerFactory, logger);
this.consumer = consumer;
}
void setProcessSession(final ProcessSession session) {
void setProcessSession(final ProcessSession session, final ProcessContext context) {
this.session = session;
this.processContext = context;
}
@Override
public void yield() {
if (processContext != null) {
processContext.yield();
}
}
@Override
@ -229,7 +298,7 @@ public class ConsumerPool implements Closeable {
super.close();
if (session != null) {
session.rollback();
setProcessSession(null);
setProcessSession(null, null);
}
if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
closedConsumer = true;

View File

@ -0,0 +1,386 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordWriter;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the meantime"
+ " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi "
+ "processor for fetching messages is ConsumeKafka_0_10_Record.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_0_10.class, ConsumeKafka_0_10.class, ConsumeKafkaRecord_0_10.class})
public class PublishKafkaRecord_0_10 extends AbstractProcessor {
protected static final String MSG_COUNT = "msg.count";
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"FlowFile will be routed to failure unless the message is replicated to the appropriate "
+ "number of Kafka Nodes according to the Topic configuration");
static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
"FlowFile will be routed to success if the message is received by a single Kafka node, "
+ "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+ "but can result in data loss if a Kafka node crashes");
static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
"FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+ "without waiting for a response. This provides the best performance but may result in data loss.");
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
Partitioners.RoundRobinPartitioner.class.getSimpleName(),
"Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name")
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder()
.name("message-key-field")
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("acks")
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
.expressionLanguageSupported(false)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
.name("max.block.ms")
.displayName("Max Metadata Wait Time")
.description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("5 sec")
.build();
static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
.name("ack.wait.time")
.displayName("Acknowledgment Wait Time")
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
.required(true)
.defaultValue("5 secs")
.build();
static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
.name("partitioner.class")
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();
static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("compression.type")
.displayName("Compression Type")
.description("This parameter allows you to specify the compression codec for all data generated by this producer.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("none", "gzip", "snappy", "lz4")
.defaultValue("none")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles for which all content was sent to Kafka.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build();
private static final List<PropertyDescriptor> PROPERTIES;
private static final Set<Relationship> RELATIONSHIPS;
private volatile PublisherPool publisherPool = null;
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
properties.add(TOPIC);
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
properties.add(KafkaProcessorUtils.USER_KEYTAB);
properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
properties.add(DELIVERY_GUARANTEE);
properties.add(MESSAGE_KEY_FIELD);
properties.add(MAX_REQUEST_SIZE);
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
properties.add(COMPRESSION_CODEC);
PROPERTIES = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName)
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
return KafkaProcessorUtils.validateCommonProperties(validationContext);
}
private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
PublisherPool pool = publisherPool;
if (pool != null) {
return pool;
}
return publisherPool = createPublisherPool(context);
}
protected PublisherPool createPublisherPool(final ProcessContext context) {
final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
final Map<String, Object> kafkaProperties = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
}
@OnStopped
public void closePool() {
if (publisherPool != null) {
publisherPool.close();
}
publisherPool = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
if (flowFiles.isEmpty()) {
return;
}
final PublisherPool pool = getPublisherPool(context);
if (pool == null) {
context.yield();
return;
}
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
// Send each FlowFile to Kafka asynchronously.
for (final FlowFile flowFile : flowFiles) {
if (!isScheduled()) {
// If stopped, re-queue FlowFile instead of sending it
session.transfer(flowFile);
continue;
}
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final RecordWriter writer;
try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
writer = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class).createWriter(getLogger(), flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
continue;
}
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
lease.publish(flowFile, reader, writer, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}
}
});
} catch (final Exception e) {
// The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
lease.getTracker().fail(flowFile, e);
continue;
}
}
// Complete the send
final PublishResult publishResult = lease.complete();
// Transfer any successful FlowFiles.
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
final int msgCount = publishResult.getSuccessfulMessageCount(success);
success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
session.adjustCounter("Messages Sent", msgCount, true);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
session.transfer(success, REL_SUCCESS);
}
// Transfer any failures.
for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
final int successCount = publishResult.getSuccessfulMessageCount(failure);
if (successCount > 0) {
getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)});
} else {
getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}",
new Object[] {failure, publishResult.getReasonForFailure(failure)});
}
session.transfer(failure, REL_FAILURE);
}
}
}
}

View File

@ -17,11 +17,14 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
@ -29,6 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@ -38,6 +45,7 @@ public class PublisherLease implements Closeable {
private final int maxMessageSize;
private final long maxAckWaitMillis;
private volatile boolean poisoned = false;
private final AtomicLong messagesSent = new AtomicLong(0L);
private InFlightMessageTracker tracker;
@ -85,7 +93,42 @@ public class PublisherLease implements Closeable {
}
}
private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker();
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
final RecordSet recordSet = reader.createRecordSet();
try {
while ((record = recordSet.next()) != null) {
baos.reset();
writer.write(record, baos);
final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
publish(flowFile, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
return;
}
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
} catch (final Exception e) {
tracker.fail(flowFile, e);
poison();
throw e;
}
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
producer.send(record, new Callback() {
@Override
@ -99,11 +142,17 @@ public class PublisherLease implements Closeable {
}
});
messagesSent.incrementAndGet();
tracker.incrementSentCount(flowFile);
}
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
@ -129,4 +178,8 @@ public class PublisherLease implements Closeable {
producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
tracker = null;
}
public InFlightMessageTracker getTracker() {
return tracker;
}
}

View File

@ -13,4 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10

View File

@ -16,18 +16,8 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -35,15 +25,22 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class ConsumeKafkaTest {
Consumer<byte[], byte[]> mockConsumer = null;
ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;
@Before
public void setup() {
mockConsumer = mock(Consumer.class);
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
}
@ -106,7 +103,7 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
@ -124,7 +121,7 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
@ -137,7 +134,7 @@ public class ConsumeKafkaTest {
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
@ -156,7 +153,7 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
@ -169,7 +166,7 @@ public class ConsumeKafkaTest {
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
@ -187,7 +184,7 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
@ -36,6 +37,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -44,14 +47,16 @@ import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
ProcessSession mockSession = null;
ProvenanceReporter mockReporter = null;
ConsumerPool testPool = null;
ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
private Consumer<byte[], byte[]> consumer = null;
private ProcessSession mockSession = null;
private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
private ProvenanceReporter mockReporter = null;
private ConsumerPool testPool = null;
private ConsumerPool testDemarcatedPool = null;
private ComponentLog logger = null;
@Before
@SuppressWarnings("unchecked")
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
@ -94,16 +99,16 @@ public class ConsumerPoolTest {
public void validatePoolSimpleCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
testPool.close();
@ -125,7 +130,7 @@ public class ConsumerPoolTest {
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
}
@ -142,7 +147,7 @@ public class ConsumerPoolTest {
public void validatePoolSimpleBatchCreateClose() throws Exception {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
@ -167,7 +172,7 @@ public class ConsumerPoolTest {
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
}
@ -184,7 +189,7 @@ public class ConsumerPoolTest {
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();
fail();

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestConsumeKafkaRecord_0_10 {
private ConsumerLease mockLease = null;
private ConsumerPool mockConsumerPool = null;
private TestRunner runner;
@Before
public void setup() throws InitializationException {
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
ConsumeKafkaRecord_0_10 proc = new ConsumeKafkaRecord_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
final String readerId = "record-reader";
final MockRecordParser readerService = new MockRecordParser();
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_READER, readerId);
runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_WRITER, writerId);
}
@Test
public void validateCustomValidatorSettings() throws Exception {
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
runner.assertValid();
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
runner.assertNotValid();
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
runner.assertValid();
runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
runner.assertValid();
runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
runner.assertNotValid();
}
@Test
public void validatePropertiesValidation() throws Exception {
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.removeProperty(ConsumeKafkaRecord_0_10.GROUP_ID);
try {
runner.assertValid();
fail();
} catch (AssertionError e) {
assertTrue(e.getMessage().contains("invalid because Group ID is required"));
}
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "");
try {
runner.assertValid();
fail();
} catch (AssertionError e) {
assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
}
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, " ");
try {
runner.assertValid();
fail();
} catch (AssertionError e) {
assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
}
}
@Test
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test
public void testJaasConfiguration() throws Exception {
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
runner.assertNotValid();
runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
runner.assertValid();
runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
runner.assertNotValid();
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
runner.assertNotValid();
runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
runner.assertValid();
}
}

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestPublishKafkaRecord_0_10 {
private static final String TOPIC_NAME = "unit-test";
private PublisherPool mockPool;
private PublisherLease mockLease;
private TestRunner runner;
@Before
public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
runner = TestRunners.newTestRunner(new PublishKafkaRecord_0_10() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return mockPool;
}
});
runner.setProperty(PublishKafkaRecord_0_10.TOPIC, TOPIC_NAME);
final String readerId = "record-reader";
final MockRecordParser readerService = new MockRecordParser();
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(PublishKafkaRecord_0_10.RECORD_READER, readerId);
runner.setProperty(PublishKafkaRecord_0_10.RECORD_WRITER, writerId);
}
@Test
public void testSingleSuccess() throws IOException {
final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
}
@Test
public void testMultipleSuccess() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
}
@Test
public void testSingleFailure() throws IOException {
final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@Test
public void testMultipleMessagesPerFlowFile() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29"));
final Map<FlowFile, Integer> msgCounts = new HashMap<>();
msgCounts.put(flowFiles.get(0), 10);
msgCounts.put(flowFiles.get(1), 20);
final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
runner.assertAllFlowFilesContainAttribute("msg.count");
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("10"))
.count());
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
.filter(ff -> ff.getAttribute("msg.count").equals("20"))
.count());
}
@Test
public void testSomeSuccessSomeFailure() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
flowFiles.add(runner.enqueue("John Doe, 48"));
final Map<FlowFile, Integer> msgCounts = new HashMap<>();
msgCounts.put(flowFiles.get(0), 10);
msgCounts.put(flowFiles.get(1), 20);
final Map<FlowFile, Exception> failureMap = new HashMap<>();
failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
.filter(ff -> "10".equals(ff.getAttribute("msg.count")))
.count());
assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
.filter(ff -> "20".equals(ff.getAttribute("msg.count")))
.count());
assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_FAILURE).stream()
.noneMatch(ff -> ff.getAttribute("msg.count") != null));
}
private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
}
private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
final Map<FlowFile, Integer> msgCounts = new HashMap<>();
for (final FlowFile ff : successfulFlowFiles) {
msgCounts.put(ff, msgCountPerFlowFile);
}
return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
}
private PublishResult createFailurePublishResult(final FlowFile failure) {
return createFailurePublishResult(Collections.singleton(failure));
}
private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
}
private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
// sanity check.
for (final FlowFile success : successFlowFiles) {
if (failures.containsKey(success)) {
throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
}
}
return new PublishResult() {
@Override
public Collection<FlowFile> getSuccessfulFlowFiles() {
return successFlowFiles;
}
@Override
public Collection<FlowFile> getFailedFlowFiles() {
return failures.keySet();
}
@Override
public int getSuccessfulMessageCount(FlowFile flowFile) {
Integer count = msgCounts.get(flowFile);
return count == null ? 0 : count.intValue();
}
@Override
public Exception getReasonForFailure(FlowFile flowFile) {
return failures.get(flowFile);
}
};
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
private final int failAfterN;
public MockRecordParser() {
this(-1);
}
public MockRecordParser(final int failAfterN) {
this.failAfterN = failAfterN;
}
public void addSchemaField(final String fieldName, final RecordFieldType type) {
fields.add(new RecordField(fieldName, type.getDataType()));
}
public void addRecord(Object... values) {
records.add(values);
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {
private int recordCount = 0;
@Override
public void close() throws IOException {
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
if (failAfterN >= recordCount) {
throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
}
final String line = reader.readLine();
if (line == null) {
return null;
}
recordCount++;
final String[] values = line.split(",");
final Map<String, Object> valueMap = new HashMap<>();
int i = 0;
for (final RecordField field : fields) {
final String fieldName = field.getFieldName();
valueMap.put(fieldName, values[i++].trim());
}
return new MapRecord(new SimpleRecordSchema(fields), valueMap);
}
@Override
public RecordSchema getSchema() {
return new SimpleRecordSchema(fields);
}
};
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
private final String header;
private final int failAfterN;
private final boolean quoteValues;
public MockRecordWriter(final String header) {
this(header, true, -1);
}
public MockRecordWriter(final String header, final boolean quoteValues) {
this(header, quoteValues, -1);
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) {
return new RecordSetWriter() {
@Override
public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
out.write(header.getBytes());
out.write("\n".getBytes());
int recordCount = 0;
final int numCols = rs.getSchema().getFieldCount();
Record record = null;
while ((record = rs.next()) != null) {
if (++recordCount > failAfterN && failAfterN > -1) {
throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
}
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
final String val = record.getAsString(fieldName);
if (quoteValues) {
out.write("\"".getBytes());
if (val != null) {
out.write(val.getBytes());
}
out.write("\"".getBytes());
} else if (val != null) {
out.write(val.getBytes());
}
if (i++ < numCols - 1) {
out.write(",".getBytes());
}
}
out.write("\n".getBytes());
}
return WriteResult.of(recordCount, Collections.emptyMap());
}
@Override
public String getMimeType() {
return "text/plain";
}
@Override
public WriteResult write(Record record, OutputStream out) throws IOException {
return null;
}
};
}
}

View File

@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
@ -53,6 +55,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
private final Map<String, String> schemaNameToSchemaMap;
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
private static final String LOGICAL_TYPE_DATE = "date";
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
@ -64,6 +67,21 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
this.schemaNameToSchemaMap = new HashMap<>();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (newValue == null) {
recordSchemas.remove(descriptor.getName());
} else {
try {
final Schema avroSchema = new Schema.Parser().parse(newValue);
final RecordSchema recordSchema = createRecordSchema(avroSchema, newValue, descriptor.getName());
recordSchemas.put(descriptor.getName(), recordSchema);
} catch (final Exception e) {
// not a problem - the service won't be valid and the validation message will indicate what is wrong.
}
}
}
@Override
public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
final String schemaText = schemaNameToSchemaMap.get(schemaName);
@ -76,9 +94,11 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
@Override
public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
final String schemaText = retrieveSchemaText(schemaName);
final Schema schema = new Schema.Parser().parse(schemaText);
return createRecordSchema(schema, schemaText, schemaName);
final RecordSchema recordSchema = recordSchemas.get(schemaName);
if (recordSchema == null) {
throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
}
return recordSchema;
}
@Override

View File

@ -21,15 +21,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Assert;
import org.junit.Test;
@ -69,44 +65,4 @@ public class TestAvroSchemaRegistry {
delegate.close();
}
@Test
public void validateRecordSchemaRetrieval() throws Exception {
String schemaName = "fooSchema";
ConfigurationContext configContext = mock(ConfigurationContext.class);
Map<PropertyDescriptor, String> properties = new HashMap<>();
PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
.name(schemaName)
.dynamic(true)
.build();
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
+ "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
+ "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
+ "{\"name\": \"foo\", \"type\": \"boolean\"}, "
+ "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
.name("barSchema")
.dynamic(false)
.build();
properties.put(fooSchema, fooSchemaText);
properties.put(barSchema, "");
when(configContext.getProperties()).thenReturn(properties);
AvroSchemaRegistry delegate = new AvroSchemaRegistry();
delegate.enable(configContext);
RecordSchema locatedSchema = delegate.retrieveSchema(schemaName);
List<RecordField> recordFields = locatedSchema.getFields();
assertEquals(4, recordFields.size());
assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType());
assertEquals("name", recordFields.get(0).getFieldName());
assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType());
assertEquals("favorite_number", recordFields.get(1).getFieldName());
assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType());
assertEquals("foo", recordFields.get(2).getFieldName());
assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType());
assertEquals("favorite_color", recordFields.get(3).getFieldName());
delegate.close();
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
@ -64,13 +65,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionCache = new ConcurrentHashMap<>();
private static final String LOGICAL_TYPE_DATE = "date";
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final long VERSION_INFO_CACHE_NANOS = TimeUnit.MINUTES.toNanos(1L);
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
.name("url")
@ -137,60 +139,76 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
return schemaRegistryClient;
}
@Override
public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
final SchemaVersionInfo latest = getClient().getLatestSchemaVersionInfo(schemaName);
if (latest == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
// Try to fetch the SchemaVersionInfo from the cache.
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionCache.get(schemaName);
// Determine if the timestampedVersionInfo is expired
boolean fetch = false;
if (timestampedVersionInfo == null) {
fetch = true;
} else {
final long minTimestamp = System.nanoTime() - VERSION_INFO_CACHE_NANOS;
fetch = timestampedVersionInfo.getValue() < minTimestamp;
}
return latest.getSchemaText();
} catch (final SchemaNotFoundException e) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
}
}
@Override
public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
final SchemaRegistryClient client = getClient();
final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName);
if (metadataInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
// If not expired, use what we got from the cache
if (!fetch) {
return timestampedVersionInfo.getKey();
}
final Long schemaId = metadataInfo.getId();
if (schemaId == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
// schema version info was expired or not found in cache. Fetch from schema registry
final SchemaVersionInfo versionInfo = client.getLatestSchemaVersionInfo(schemaName);
if (versionInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final Integer version = versionInfo.getVersion();
if (version == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version);
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
return createRecordSchema(schema, schemaText, schemaIdentifier);
});
// Store new version in cache.
final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
schemaVersionCache.put(schemaName, tuple);
return versionInfo;
} catch (final SchemaNotFoundException e) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
}
}
@Override
public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
final SchemaVersionInfo latest = getLatestSchemaVersionInfo(getClient(), schemaName);
return latest.getSchemaText();
}
@Override
public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
final SchemaRegistryClient client = getClient();
final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName);
if (metadataInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final Long schemaId = metadataInfo.getId();
if (schemaId == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final SchemaVersionInfo versionInfo = getLatestSchemaVersionInfo(client, schemaName);
final Integer version = versionInfo.getVersion();
if (version == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version);
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
return createRecordSchema(schema, schemaText, schemaIdentifier);
});
}
@Override
public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {

View File

@ -20,8 +20,12 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
@ -33,6 +37,7 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
@CapabilityDescription("Parses Avro data and returns each Avro record as an separate Record object. The Avro data may contain the schema itself, "
@ -40,6 +45,14 @@ import org.apache.nifi.serialization.SchemaRegistryService;
public class AvroReader extends SchemaRegistryService implements RecordReaderFactory {
private final AllowableValue EMBEDDED_AVRO_SCHEMA = new AllowableValue("embedded-avro-schema",
"Use Embedded Avro Schema", "The FlowFile has the Avro Schema embedded within the content, and this schema will be used.");
private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20;
private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) {
return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE;
}
};
@Override
@ -55,7 +68,48 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in);
} else {
return new AvroReaderWithExplicitSchema(in, getSchema(flowFile, in));
final RecordSchema recordSchema = getSchema(flowFile, in);
final Schema avroSchema;
try {
if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
final Optional<String> textOption = recordSchema.getSchemaText();
if (textOption.isPresent()) {
avroSchema = compileAvroSchema(textOption.get());
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} catch (final Exception e) {
throw new SchemaNotFoundException("Failed to compile Avro Schema", e);
}
return new AvroReaderWithExplicitSchema(in, recordSchema, avroSchema);
}
}
private Schema compileAvroSchema(final String text) {
// Access to the LinkedHashMap must be done while synchronized on this.
// However, if no compiled schema exists, we don't want to remain synchronized
// while we compile it, as compilation can be expensive. As a result, if there is
// not a compiled schema already, we will compile it outside of the synchronized
// block, and then re-synchronize to update the map. All of this is functionally
// equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> new Schema.Parser().parse(t));
// but does so without synchronizing when not necessary.
Schema compiled;
synchronized (this) {
compiled = compiledAvroSchemaCache.get(text);
}
if (compiled != null) {
return compiled;
}
final Schema newlyCompiled = new Schema.Parser().parse(text);
synchronized (this) {
return compiledAvroSchemaCache.computeIfAbsent(text, t -> newlyCompiled);
}
}

View File

@ -39,11 +39,11 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final BinaryDecoder decoder;
private GenericRecord genericRecord;
public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema) throws IOException, SchemaNotFoundException {
public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException {
this.in = in;
this.recordSchema = recordSchema;
this.avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
this.avroSchema = avroSchema;
datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
decoder = DecoderFactory.get().binaryDecoder(in, null);
}

View File

@ -21,22 +21,22 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
@ -46,34 +46,42 @@ import org.apache.nifi.serialization.record.RecordSchema;
@CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.")
public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20;
private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) {
return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE;
}
};
static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema",
"The FlowFile will have the Avro schema embedded into the content, as is typical with Avro");
protected static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY);
properties.add(SCHEMA_REGISTRY);
return properties;
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) throws IOException {
final String strategyValue = getConfigurationContext().getProperty(SCHEMA_WRITE_STRATEGY).getValue();
try {
final RecordSchema recordSchema = getSchema(flowFile, in);
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
final Schema avroSchema;
try {
if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
final Optional<String> textOption = recordSchema.getSchemaText();
if (textOption.isPresent()) {
avroSchema = compileAvroSchema(textOption.get());
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} catch (final Exception e) {
throw new SchemaNotFoundException("Failed to compile Avro Schema", e);
}
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema);
@ -85,6 +93,30 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
}
}
private Schema compileAvroSchema(final String text) {
// Access to the LinkedHashMap must be done while synchronized on this.
// However, if no compiled schema exists, we don't want to remain synchronized
// while we compile it, as compilation can be expensive. As a result, if there is
// not a compiled schema already, we will compile it outside of the synchronized
// block, and then re-synchronize to update the map. All of this is functionally
// equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> new Schema.Parser().parse(t));
// but does so without synchronizing when not necessary.
Schema compiled;
synchronized (this) {
compiled = compiledAvroSchemaCache.get(text);
}
if (compiled != null) {
return compiled;
}
final Schema newlyCompiled = new Schema.Parser().parse(text);
synchronized (this) {
return compiledAvroSchemaCache.computeIfAbsent(text, t -> newlyCompiled);
}
}
@Override
protected List<AllowableValue> getSchemaWriteStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>();

View File

@ -72,4 +72,23 @@ public class WriteAvroResultWithExternalSchema extends WriteAvroResult {
return WriteResult.of(nrOfRows, schemaAccessWriter.getAttributes(recordSchema));
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
schemaAccessWriter.writeHeader(recordSchema, bufferedOut);
final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
datumWriter.write(rec, encoder);
encoder.flush();
bufferedOut.flush();
return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema));
}
}

View File

@ -59,4 +59,23 @@ public class WriteAvroResultWithSchema extends WriteAvroResult {
return WriteResult.of(nrOfRows, Collections.emptyMap());
}
@Override
public WriteResult write(final Record record, final OutputStream out) throws IOException {
if (record == null) {
return WriteResult.of(0, Collections.emptyMap());
}
final Schema schema = getSchema();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, out);
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
}
return WriteResult.of(1, Collections.emptyMap());
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.text.DateFormat;
import java.util.HashMap;
import java.util.Map;
@ -42,17 +43,17 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class CSVRecordReader implements RecordReader {
private final CSVParser csvParser;
private final RecordSchema schema;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
private final DateFormat dateFormat;
private final DateFormat timeFormat;
private final DateFormat timestampFormat;
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
this.schema = schema;
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
final Reader reader = new InputStreamReader(new BOMInputStream(in));
final CSVFormat withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));

View File

@ -70,6 +70,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) throws SchemaNotFoundException, IOException {
final RecordSchema schema = getSchema(flowFile, in);
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), getDateFormat(), getTimeFormat(), getTimestampFormat(), includeHeader);
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema),
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader);
}
}

View File

@ -54,15 +54,15 @@ public class WriteCSVResult implements RecordSetWriter {
this.includeHeaderLine = includeHeaderLine;
}
private String getFormat(final Record record, final RecordField field) {
private String getFormat(final RecordField field) {
final DataType dataType = field.getDataType();
switch (dataType.getFieldType()) {
case DATE:
return dateFormat == null ? dataType.getFormat() : dateFormat;
return dateFormat;
case TIME:
return timeFormat == null ? dataType.getFormat() : timeFormat;
return timeFormat;
case TIMESTAMP:
return timestampFormat == null ? dataType.getFormat() : timestampFormat;
return timestampFormat;
}
return dataType.getFormat();
@ -87,7 +87,7 @@ public class WriteCSVResult implements RecordSetWriter {
final Object[] colVals = new Object[recordSchema.getFieldCount()];
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
colVals[i++] = record.getAsString(recordField, getFormat(record, recordField));
colVals[i++] = record.getAsString(recordField, getFormat(recordField));
}
printer.printRecord(colVals);
@ -113,7 +113,7 @@ public class WriteCSVResult implements RecordSetWriter {
final Object[] colVals = new Object[schema.getFieldCount()];
int i = 0;
for (final RecordField recordField : schema.getFields()) {
colVals[i++] = record.getAsString(recordField, getFormat(record, recordField));
colVals[i++] = record.getAsString(recordField, getFormat(recordField));
}
printer.printRecord(colVals);

View File

@ -37,20 +37,20 @@ import org.codehaus.jackson.map.ObjectMapper;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
private final ComponentLog logger;
private final JsonParser jsonParser;
private final JsonFactory jsonFactory;
private final boolean array;
private final JsonNode firstJsonNode;
private boolean firstObjectConsumed = false;
private static final JsonFactory jsonFactory = new JsonFactory();
private static final ObjectMapper codec = new ObjectMapper();
public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException {
this.logger = logger;
jsonFactory = new JsonFactory();
try {
jsonParser = jsonFactory.createJsonParser(in);
jsonParser.setCodec(new ObjectMapper());
jsonParser.setCodec(codec);
JsonToken token = jsonParser.nextToken();
if (token == JsonToken.START_ARRAY) {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.json;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -52,18 +53,18 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
private final LinkedHashMap<String, JsonPath> jsonPaths;
private final InputStream in;
private RecordSchema schema;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
private final DateFormat dateFormat;
private final DateFormat timeFormat;
private final DateFormat timestampFormat;
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
final String dateFormat, final String timeFormat, final String timestampFormat)
throws MalformedRecordException, IOException {
super(in, logger);
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
this.schema = schema;
this.jsonPaths = jsonPaths;

View File

@ -66,7 +66,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream flowFileContent) throws SchemaNotFoundException, IOException {
final RecordSchema schema = getSchema(flowFile, flowFileContent);
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), prettyPrint, getDateFormat(), getTimeFormat(), getTimestampFormat());
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), prettyPrint,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}
}

View File

@ -19,11 +19,13 @@ package org.apache.nifi.json;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
@ -34,6 +36,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
@ -44,24 +47,24 @@ import org.codehaus.jackson.node.ArrayNode;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
private final RecordSchema schema;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
private final DateFormat dateFormat;
private final DateFormat timeFormat;
private final DateFormat timestampFormat;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
super(in, logger);
this.schema = schema;
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
}
@Override
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException, MalformedRecordException {
return convertJsonNodeToRecord(jsonNode, schema, "");
return convertJsonNodeToRecord(jsonNode, schema, null);
}
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix) throws IOException, MalformedRecordException {
@ -70,26 +73,35 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
}
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
for (int i = 0; i < schema.getFieldCount(); i++) {
final RecordField field = schema.getField(i);
for (final RecordField field : schema.getFields()) {
final String fieldName = field.getFieldName();
JsonNode fieldNode = jsonNode.get(fieldName);
if (fieldNode == null) {
for (final String alias : field.getAliases()) {
fieldNode = jsonNode.get(alias);
if (fieldNode != null) {
break;
}
}
}
final JsonNode fieldNode = getJsonNode(jsonNode, field);
final DataType desiredType = field.getDataType();
final Object value = convertField(fieldNode, fieldNamePrefix + fieldName, desiredType);
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
final Object value = convertField(fieldNode, fullFieldName, desiredType);
values.put(fieldName, value);
}
return new MapRecord(schema, values);
final Supplier<String> supplier = () -> jsonNode.toString();
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"));
}
private JsonNode getJsonNode(final JsonNode parent, final RecordField field) {
JsonNode fieldNode = parent.get(field.getFieldName());
if (fieldNode != null) {
return fieldNode;
}
for (final String alias : field.getAliases()) {
fieldNode = parent.get(alias);
if (fieldNode != null) {
return fieldNode;
}
}
return fieldNode;
}
protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException {
@ -115,7 +127,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
case SHORT:
return DataTypeUtils.toShort(getRawNodeValue(fieldNode), fieldName);
case STRING:
return DataTypeUtils.toString(getRawNodeValue(fieldNode), dateFormat, timeFormat, timestampFormat);
return DataTypeUtils.toString(getRawNodeValue(fieldNode), DataTypeUtils.getDateFormat(desiredType.getFieldType(), dateFormat, timeFormat, timestampFormat));
case DATE:
return DataTypeUtils.toDate(getRawNodeValue(fieldNode), dateFormat, fieldName);
case TIME:

View File

@ -21,8 +21,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.sql.SQLException;
import java.util.Collections;
import java.text.DateFormat;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
@ -34,6 +35,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
@ -50,9 +52,9 @@ public class WriteJsonResult implements RecordSetWriter {
private final SchemaAccessWriter schemaAccess;
private final RecordSchema recordSchema;
private final JsonFactory factory = new JsonFactory();
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;
private final DateFormat dateFormat;
private final DateFormat timeFormat;
private final DateFormat timestampFormat;
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final boolean prettyPrint,
final String dateFormat, final String timeFormat, final String timestampFormat) {
@ -62,9 +64,9 @@ public class WriteJsonResult implements RecordSetWriter {
this.prettyPrint = prettyPrint;
this.schemaAccess = schemaAccess;
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
}
@Override
@ -96,6 +98,8 @@ public class WriteJsonResult implements RecordSetWriter {
@Override
public WriteResult write(final Record record, final OutputStream rawOut) throws IOException {
schemaAccess.writeHeader(recordSchema, rawOut);
try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) {
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
@ -106,12 +110,24 @@ public class WriteJsonResult implements RecordSetWriter {
throw new IOException("Failed to write records to stream", e);
}
return WriteResult.of(1, Collections.emptyMap());
return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)
throws JsonGenerationException, IOException, SQLException {
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (serializedForm.isPresent()) {
final SerializedForm form = serializedForm.get();
if (form.getMimeType().equals(getMimeType()) && record.getSchema().equals(writeSchema)) {
final Object serialized = form.getSerialized();
if (serialized instanceof String) {
generator.writeRawValue((String) serialized);
return;
}
}
}
try {
startTask.apply(generator);
for (int i = 0; i < writeSchema.getFieldCount(); i++) {
@ -146,18 +162,40 @@ public class WriteJsonResult implements RecordSetWriter {
}
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, dateFormat, timeFormat, timestampFormat, fieldName);
if (coercedValue == null) {
generator.writeNull();
return;
}
switch (chosenDataType.getFieldType()) {
case DATE:
case TIME:
case TIMESTAMP:
generator.writeString(DataTypeUtils.toString(coercedValue, dateFormat, timeFormat, timestampFormat));
case DATE: {
final String stringValue = DataTypeUtils.toString(coercedValue, dateFormat);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIME: {
final String stringValue = DataTypeUtils.toString(coercedValue, timeFormat);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIMESTAMP: {
final String stringValue = DataTypeUtils.toString(coercedValue, timestampFormat);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case DOUBLE:
generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
break;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.serialization;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
@ -26,9 +27,9 @@ import org.apache.nifi.controller.ConfigurationContext;
public abstract class DateTimeTextRecordSetWriter extends SchemaRegistryRecordSetWriter {
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
private volatile Optional<String> dateFormat;
private volatile Optional<String> timeFormat;
private volatile Optional<String> timestampFormat;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -41,20 +42,20 @@ public abstract class DateTimeTextRecordSetWriter extends SchemaRegistryRecordSe
@OnEnabled
public void captureValues(final ConfigurationContext context) {
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
this.dateFormat = Optional.ofNullable(context.getProperty(DateTimeUtils.DATE_FORMAT).getValue());
this.timeFormat = Optional.ofNullable(context.getProperty(DateTimeUtils.TIME_FORMAT).getValue());
this.timestampFormat = Optional.ofNullable(context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue());
}
protected String getDateFormat() {
protected Optional<String> getDateFormat() {
return dateFormat;
}
protected String getTimeFormat() {
protected Optional<String> getTimeFormat() {
return timeFormat;
}
protected String getTimestampFormat() {
protected Optional<String> getTimestampFormat() {
return timestampFormat;
}
}

View File

@ -62,7 +62,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.name("Schema Write Strategy")
.description("Specifies how the schema for a Record should be added to the data.")
.allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
.defaultValue(AVRO_SCHEMA_ATTRIBUTE.getValue())
.defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
.required(true)
.build();
@ -89,7 +89,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
}
protected AllowableValue getDefaultSchemaWriteStrategy() {
return AVRO_SCHEMA_ATTRIBUTE;
return SCHEMA_NAME_ATTRIBUTE;
}
protected PropertyDescriptor getSchemaWriteStrategyDescriptor() {

View File

@ -21,15 +21,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.logging.ComponentLog;
@ -41,6 +45,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
@ -71,6 +76,64 @@ public class TestJsonTreeRowRecordReader {
return accountSchema;
}
@Test
@Ignore("Intended only for manual testing to determine performance before/after modifications")
public void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
final byte[] data = Files.readAllBytes(file.toPath());
final ComponentLog logger = Mockito.mock(ComponentLog.class);
int recordCount = 0;
final int iterations = 1000;
for (int j = 0; j < 5; j++) {
final long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
try (final InputStream in = new ByteArrayInputStream(data);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) {
while (reader.nextRecord() != null) {
recordCount++;
}
}
}
final long nanos = System.nanoTime() - start;
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
System.out.println("Took " + millis + " millis to read " + recordCount + " records");
}
}
@Test
@Ignore("Intended only for manual testing to determine performance before/after modifications")
public void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
final byte[] data = Files.readAllBytes(file.toPath());
final ComponentLog logger = Mockito.mock(ComponentLog.class);
int recordCount = 0;
final int iterations = 1_000_000;
for (int j = 0; j < 5; j++) {
final long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
try (final InputStream in = new ByteArrayInputStream(data);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) {
while (reader.nextRecord() != null) {
recordCount++;
}
}
}
final long nanos = System.nanoTime() - start;
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
System.out.println("Took " + millis + " millis to read " + recordCount + " records");
}
}
@Test
public void testReadArray() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Date;
@ -31,6 +32,7 @@ import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -46,6 +48,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.SerializedForm;
import org.junit.Test;
import org.mockito.Mockito;
@ -112,4 +115,73 @@ public class TestWriteJsonResult {
assertEquals(expected, output);
}
@Test
public void testWriteSerializedForm() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values1 = new HashMap<>();
values1.put("name", "John Doe");
values1.put("age", 42);
final String serialized1 = "{ \"name\": \"John Doe\", \"age\": 42 }";
final SerializedForm serializedForm1 = SerializedForm.of(serialized1, "application/json");
final Record record1 = new MapRecord(schema, values1, serializedForm1);
final Map<String, Object> values2 = new HashMap<>();
values2.put("name", "Jane Doe");
values2.put("age", 43);
final String serialized2 = "{ \"name\": \"Jane Doe\", \"age\": 43 }";
final SerializedForm serializedForm2 = SerializedForm.of(serialized2, "application/json");
final Record record2 = new MapRecord(schema, values1, serializedForm2);
final RecordSet rs = RecordSet.of(schema, record1, record2);
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(rs, baos);
data = baos.toByteArray();
}
final String expected = "[ " + serialized1 + ", " + serialized2 + " ]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testTimestampWithNullFormat() throws IOException {
final Map<String, Object> values = new HashMap<>();
values.put("timestamp", new java.sql.Timestamp(37293723L));
values.put("time", new java.sql.Time(37293723L));
values.put("date", new java.sql.Date(37293723L));
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record record = new MapRecord(schema, values);
final RecordSet rs = RecordSet.of(schema, record);
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), false, null, null, null);
final byte[] data;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.write(rs, baos);
data = baos.toByteArray();
}
final String expected = "[{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
}