eldest) {
+ return size() >= cacheSize;
+ }
+ }
+
+
+ private static class CachedRecordSchema {
+ private final RecordSchema schema;
+ private final long cachedTimestamp;
+
+ public CachedRecordSchema(final RecordSchema schema) {
+ this(schema, System.nanoTime());
+ }
+
+ public CachedRecordSchema(final RecordSchema schema, final long timestamp) {
+ this.schema = schema;
+ this.cachedTimestamp = timestamp;
+ }
+
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ public boolean isOlderThan(final long timestamp) {
+ return cachedTimestamp < timestamp;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
new file mode 100644
index 0000000000..a19a90db57
--- /dev/null
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.confluent.schemaregistry.client;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.web.util.WebUtils;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+
+
+/**
+ *
+ * A Client for interacting with Confluent Schema Registry. We make use of Jersey Client to interact with the
+ * Confluent Schema Registry REST API because the provided schema registry client does not provide a way to
+ * use HTTPS for interacting with the schema registry (it assumes that system properties will be used, instead of
+ * an SSLContext) and also does not allow configuration of (or use) timeouts. As a result, if the Schema Registry
+ * crashed or was shut down, NiFi threads could be stuck indefinitely until NiFi is restarted. To avoid this,
+ * we make use of Jersey Client and set timeouts appropriately.
+ *
+ */
+public class RestSchemaRegistryClient implements SchemaRegistryClient {
+ private final List baseUrls;
+ private final Client client;
+
+ private static final String SUBJECT_FIELD_NAME = "subject";
+ private static final String VERSION_FIELD_NAME = "version";
+ private static final String ID_FIELD_NAME = "id";
+ private static final String SCHEMA_TEXT_FIELD_NAME = "schema";
+
+ private final ConcurrentMap schemaNameToIdentifierMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap schemaIdentifierToNameMap = new ConcurrentHashMap<>();
+
+
+ public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext) {
+ this.baseUrls = new ArrayList<>(baseUrls);
+
+ final ClientConfig clientConfig = new DefaultClientConfig();
+ clientConfig.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+ clientConfig.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ client = WebUtils.createClient(clientConfig, sslContext);
+ }
+
+
+ @Override
+ public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
+ final String pathSuffix = getSubjectPath(schemaName);
+ final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName);
+
+ final RecordSchema recordSchema = createRecordSchema(responseJson);
+ return recordSchema;
+ }
+
+
+ @Override
+ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
+ // The Confluent Schema Registry's REST API does not provide us with the 'subject' (name) of a Schema given the ID.
+ // It will provide us only the text of the Schema itself. Therefore, in order to determine the name (which is required for
+ // a SchemaIdentifier), we must obtain a list of all Schema names, and then request each and every one of the schemas to determine
+ // if the ID requested matches the Schema's ID.
+ // To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently.
+
+ // Check if we have cached the Identifier to Name mapping
+ final String schemaName = schemaIdentifierToNameMap.get(schemaId);
+ if (schemaName != null) {
+ return getSchema(schemaName);
+ }
+
+ final String schemaDescription = "identifier " + schemaId;
+ final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription);
+ if (!schemaNameArray.isArray()) {
+ throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response");
+ }
+
+ final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
+ for (final JsonNode node : arrayNode) {
+ final String nodeName = node.getTextValue();
+
+ final String schemaPath = getSubjectPath(nodeName);
+ final JsonNode schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
+
+ final int id = schemaNode.get(ID_FIELD_NAME).asInt();
+ schemaNameToIdentifierMap.put(nodeName, id);
+ schemaIdentifierToNameMap.put(id, nodeName);
+
+ if (id == schemaId) {
+ return createRecordSchema(schemaNode);
+ }
+ }
+
+ throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId);
+ }
+
+ private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
+ final String subject = schemaNode.get(SUBJECT_FIELD_NAME).getTextValue();
+ final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
+ final int id = schemaNode.get(ID_FIELD_NAME).asInt();
+ final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).getTextValue();
+
+ try {
+ final Schema avroSchema = new Schema.Parser().parse(schemaText);
+ final SchemaIdentifier schemaId = SchemaIdentifier.of(subject, id, version);
+
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
+ return recordSchema;
+ } catch (final SchemaParseException spe) {
+ throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
+ + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
+ }
+ }
+
+ private String getSubjectPath(final String schemaName) throws UnsupportedEncodingException {
+ return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest";
+ }
+
+ private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
+ String errorMessage = null;
+ for (final String baseUrl : baseUrls) {
+ final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
+ final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
+ final String url = trimmedBase + path;
+
+ final WebResource.Builder builder = client.resource(url).accept(MediaType.APPLICATION_JSON);
+ final ClientResponse response = builder.get(ClientResponse.class);
+ final int responseCode = response.getStatus();
+
+ if (responseCode == Response.Status.OK.getStatusCode()) {
+ final JsonNode responseJson = response.getEntity(JsonNode.class);
+ return responseJson;
+ }
+
+ if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl);
+ }
+
+ if (errorMessage == null) {
+ errorMessage = response.getEntity(String.class);
+ }
+ }
+
+ throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
new file mode 100644
index 0000000000..3c8c0cb5aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.confluent.schemaregistry.client;
+
+import java.io.IOException;
+
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public interface SchemaRegistryClient {
+
+ RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException;
+
+ RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException;
+}
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..65c5da8d08
--- /dev/null
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.confluent.schemaregistry.ConfluentSchemaRegistry
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml
new file mode 100644
index 0000000000..db57b04ca6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml
@@ -0,0 +1,27 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 1.4.0-SNAPSHOT
+
+ nifi-confluent-platform-bundle
+ pom
+ A bundle of components that interact with the Confluent Platform
+
+
+ nifi-confluent-schema-registry-service
+ nifi-confluent-platform-nar
+
+
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index b335b1156a..4f0f945ea0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -16,20 +16,19 @@
*/
package org.apache.nifi.schema.access;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
public class SchemaAccessUtils {
public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
@@ -45,10 +44,13 @@ public class SchemaAccessUtils {
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema",
"The schema used to write records will be the same schema that was given to the Record when the Record was created.");
+ public static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Content-Encoded Schema Reference",
+ "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single "
+ + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
+ + "This is based on version 3.2.x of the Confluent Schema Registry.");
-
- public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry")
@@ -56,11 +58,11 @@ public class SchemaAccessUtils {
.required(false)
.build();
- public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
.name("schema-access-strategy")
.displayName("Schema Access Strategy")
.description("Specifies how to obtain the schema that is to be used for interpreting the data.")
- .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+ .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
.required(true)
.build();
@@ -115,42 +117,11 @@ public class SchemaAccessUtils {
private static boolean isSchemaRegistryRequired(final String schemaAccessValue) {
return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
- || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
+ || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue) || CONFLUENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue);
}
- public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
- if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
- return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
- } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
- return new InheritSchemaFromRecord();
- } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
- return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
- } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
- return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
- } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
- return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
- }
-
- return null;
- }
-
- public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
- if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
- return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
- } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
- return new InheritSchemaFromRecord();
- } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
- return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
- } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
- return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
- } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
- return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
- }
-
- return null;
- }
-
- public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+
+ public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
@@ -161,6 +132,8 @@ public class SchemaAccessUtils {
return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+ } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
+ return new ConfluentSchemaRegistryStrategy(schemaRegistry);
}
return null;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
new file mode 100644
index 0000000000..f892ab882e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
+ private final Set schemaFields;
+ private final SchemaRegistry schemaRegistry;
+
+ public ConfluentSchemaRegistryStrategy(final SchemaRegistry schemaRegistry) {
+ this.schemaRegistry = schemaRegistry;
+
+ schemaFields = new HashSet<>();
+ schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
+ schemaFields.add(SchemaField.SCHEMA_VERSION);
+ schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
+ }
+
+ @Override
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ final byte[] buffer = new byte[5];
+ try {
+ StreamUtils.fillBuffer(contentStream, buffer);
+ } catch (final IOException ioe) {
+ throw new SchemaNotFoundException("Could not read first 5 bytes from stream", ioe);
+ }
+
+ // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
+ // as it is provided at:
+ // http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
+ // The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes
+ // representing the schema id.
+ final ByteBuffer bb = ByteBuffer.wrap(buffer);
+ final int magicByte = bb.get();
+ if (magicByte != 0) {
+ throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. "
+ + "Expected stream to begin with a Magic Byte of 0 but first byte was " + magicByte);
+ }
+
+ final int schemaId = bb.getInt();
+ return schemaRegistry.retrieveSchema(schemaId, 1);
+ }
+
+ @Override
+ public Set getSuppliedSchemaFields() {
+ return schemaFields;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
new file mode 100644
index 0000000000..3677b9fc0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter {
+ private static final Set requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+
+ @Override
+ public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
+ final SchemaIdentifier identifier = schema.getIdentifier();
+ final long id = identifier.getIdentifier().getAsLong();
+
+ // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
+ // as it is provided at:
+ // http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
+ // The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes
+ // representing the schema id.
+ final ByteBuffer bb = ByteBuffer.allocate(5);
+ bb.put((byte) 0);
+ bb.putInt((int) id);
+
+ out.write(bb.array());
+ }
+
+ @Override
+ public Map getAttributes(final RecordSchema schema) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
+ final SchemaIdentifier identifier = schema.getIdentifier();
+ final OptionalLong identifierOption = identifier.getIdentifier();
+ if (!identifierOption.isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Identifier is not known");
+ }
+
+ final OptionalInt versionOption = identifier.getVersion();
+ if (!versionOption.isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Version is not known");
+ }
+ }
+
+ @Override
+ public Set getRequiredSchemaFields() {
+ return requiredSchemaFields;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
index 5944f446a9..636e30f649 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
@@ -24,13 +24,4 @@
nifi-registry-service
nifi-registry-nar
-
-
-
- org.apache.nifi
- nifi-registry-processors
- 1.2.0-SNAPSHOT
-
-
-
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index 3027e5fc03..ccb54b03b9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -60,12 +60,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
private final ConcurrentMap> schemaVersionByNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap> schemaVersionByKeyCache = new ConcurrentHashMap<>();
- private static final String LOGICAL_TYPE_DATE = "date";
- private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
- private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
- private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
- private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
-
private volatile long versionInfoCacheNanos;
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
index a391a7b30a..14cd60d897 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -63,10 +63,11 @@ public class CSVUtils {
.build();
static final PropertyDescriptor SKIP_HEADER_LINE = new PropertyDescriptor.Builder()
.name("Skip Header Line")
- .description("Specifies whether or not the first line of CSV should be considered a Header and skipped. If the Schema Access Strategy "
+ .displayName("Treat First Line as Header")
+ .description("Specifies whether or not the first line of CSV should be considered a Header or should be considered a record. If the Schema Access Strategy "
+ "indicates that the columns must be defined in the header, then this property will be ignored, since the header must always be "
- + "present and won't be processed as a Record. Otherwise, this property should be 'true' if the first non-comment line of CSV "
- + "contains header information that needs to be ignored.")
+ + "present and won't be processed as a Record. Otherwise, if 'true', then the first line of CSV data will not be processed as a record and if 'false',"
+ + "then the first line will be interpreted as a record.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.allowableValues("true", "false")
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 0acf6ff951..fb28b17644 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -36,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
@@ -63,6 +64,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
+ static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Schema Registry Reference",
+ "The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single "
+ + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
+ + "This will be prepended to each FlowFile. Note that if the schema for a record does not contain the necessary identifier and version, "
+ + "an Exception will be thrown when attempting to write the data. This is based on the encoding used by version 3.2.x of the Confluent Schema Registry.");
static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile.");
/**
@@ -73,8 +79,6 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
.name("Schema Write Strategy")
.description("Specifies how the schema for a Record should be added to the data.")
- .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)
- .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
.required(true)
.build();
@@ -83,7 +87,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private volatile SchemaAccessWriter schemaAccessWriter;
private final List schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
- SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA));
+ SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
private final List schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
@@ -156,6 +160,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return new HortonworksEncodedSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceWriter();
+ } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
+ return new ConfluentSchemaRegistryWriter();
} else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
return new NopSchemaAccessWriter();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index ddcfb0ce78..53b030a92a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -51,6 +51,7 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPER
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
public abstract class SchemaRegistryService extends AbstractControllerService {
@@ -59,7 +60,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private final List strategyList = Collections.unmodifiableList(Arrays.asList(
- SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+ SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 28ff134adc..06dc5ab4db 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -35,6 +35,7 @@
nifi-kafka-bundle
nifi-kite-bundle
nifi-solr-bundle
+ nifi-confluent-platform-bundle
nifi-aws-bundle
nifi-social-media-bundle
nifi-enrich-bundle
diff --git a/pom.xml b/pom.xml
index a66cbf1080..7d840dc4d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1276,6 +1276,12 @@
1.4.0-SNAPSHOT
nar