diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index 574831be00..79dbc84b50 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -196,5 +196,25 @@ limitations under the License. junit test + + org.apache.nifi + nifi-mock + test + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/empty-schema.avsc + + + + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index a83327dcb2..d2289a2e0e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -16,9 +16,7 @@ */ package org.apache.nifi.schemaregistry.hortonworks; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -64,15 +62,18 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private final ConcurrentMap, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>(); - private final ConcurrentMap> schemaVersionCache = new ConcurrentHashMap<>(); + private final ConcurrentMap> schemaVersionByNameCache = new ConcurrentHashMap<>(); + private final ConcurrentMap> schemaVersionByKeyCache = new ConcurrentHashMap<>(); private static final String LOGICAL_TYPE_DATE = "date"; private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; - private static final long VERSION_INFO_CACHE_NANOS = TimeUnit.MINUTES.toNanos(1L); + + private volatile long versionInfoCacheNanos; static final PropertyDescriptor URL = new PropertyDescriptor.Builder() .name("url") @@ -83,33 +84,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .required(true) .build(); + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder() + .name("cache-expiration") + .displayName("Cache Expiration") + .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a " + + "cached version of a schema will no longer be used, and the service will have to communicate with the " + + "Hortonworks Schema Registry again in order to obtain the schema.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .required(true) + .build(); - private static final List propertyDescriptors = Collections.singletonList(URL); private volatile SchemaRegistryClient schemaRegistryClient; private volatile boolean initialized; private volatile Map schemaRegistryConfig; - public HortonworksSchemaRegistry() { - } - @OnEnabled public void enable(final ConfigurationContext context) throws InitializationException { schemaRegistryConfig = new HashMap<>(); + versionInfoCacheNanos = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); + // The below properties may or may not need to be exposed to the end // user. We just need to watch usage patterns to see if sensible default // can satisfy NiFi requirements String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue(); - if (urlValue == null || urlValue.trim().length() == 0){ - throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty."); + if (urlValue == null || urlValue.trim().isEmpty()) { + throw new IllegalArgumentException("'Schema Registry URL' must not be null or empty."); } schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue); schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS)); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), context.getProperty(CACHE_SIZE).asInteger()); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS)); } @@ -126,11 +144,15 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override protected List getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List properties = new ArrayList<>(); + properties.add(URL); + properties.add(CACHE_SIZE); + properties.add(CACHE_EXPIRATION); + return properties; } - private synchronized SchemaRegistryClient getClient() { + protected synchronized SchemaRegistryClient getClient() { if (!initialized) { schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig); initialized = true; @@ -142,14 +164,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { try { // Try to fetch the SchemaVersionInfo from the cache. - final Tuple timestampedVersionInfo = schemaVersionCache.get(schemaName); + final Tuple timestampedVersionInfo = schemaVersionByNameCache.get(schemaName); // Determine if the timestampedVersionInfo is expired boolean fetch = false; if (timestampedVersionInfo == null) { fetch = true; } else { - final long minTimestamp = System.nanoTime() - VERSION_INFO_CACHE_NANOS; + final long minTimestamp = System.nanoTime() - versionInfoCacheNanos; fetch = timestampedVersionInfo.getValue() < minTimestamp; } @@ -166,7 +188,41 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme // Store new version in cache. final Tuple tuple = new Tuple<>(versionInfo, System.nanoTime()); - schemaVersionCache.put(schemaName, tuple); + schemaVersionByNameCache.put(schemaName, tuple); + return versionInfo; + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + private SchemaVersionInfo getSchemaVersionInfo(final SchemaRegistryClient client, final SchemaVersionKey key) throws org.apache.nifi.schema.access.SchemaNotFoundException { + try { + // Try to fetch the SchemaVersionInfo from the cache. + final Tuple timestampedVersionInfo = schemaVersionByKeyCache.get(key); + + // Determine if the timestampedVersionInfo is expired + boolean fetch = false; + if (timestampedVersionInfo == null) { + fetch = true; + } else { + final long minTimestamp = System.nanoTime() - versionInfoCacheNanos; + fetch = timestampedVersionInfo.getValue() < minTimestamp; + } + + // If not expired, use what we got from the cache + if (!fetch) { + return timestampedVersionInfo.getKey(); + } + + // schema version info was expired or not found in cache. Fetch from schema registry + final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(key); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + key.getSchemaName() + "' and version " + key.getVersion()); + } + + // Store new version in cache. + final Tuple tuple = new Tuple<>(versionInfo, System.nanoTime()); + schemaVersionByKeyCache.put(key, tuple); return versionInfo; } catch (final SchemaNotFoundException e) { throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); @@ -211,58 +267,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override - public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { - try { - final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); - - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - return versionInfo.getSchemaText(); - } catch (final SchemaNotFoundException e) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + final SchemaRegistryClient client = getClient(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); } + + final SchemaMetadata metadata = info.getSchemaMetadata(); + final String schemaName = metadata.getName(); + + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + return versionInfo.getSchemaText(); } @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { - try { - final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); - - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - final String schemaText = versionInfo.getSchemaText(); - - final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); - final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); - return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { - final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); - }); - } catch (final SchemaNotFoundException e) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + final SchemaRegistryClient client = getClient(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); } + + final SchemaMetadata metadata = info.getSchemaMetadata(); + final String schemaName = metadata.getName(); + + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + final String schemaText = versionInfo.getSchemaText(); + + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); + final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); + return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { + final Schema schema = new Schema.Parser().parse(schemaText); + return createRecordSchema(schema, schemaText, schemaIdentifier); + }); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java new file mode 100644 index 0000000000..40f2ba7d9c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schemaregistry.hortonworks; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.lang.reflect.Constructor; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockConfigurationContext; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.hortonworks.registries.schemaregistry.SchemaCompatibility; +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; + +public class TestHortonworksSchemaRegistry { + private HortonworksSchemaRegistry registry; + private SchemaRegistryClient client; + + private final Map schemaVersionInfoMap = new HashMap<>(); + private final Map schemaMetadataInfoMap = new HashMap<>(); + + @Before + public void setup() throws SchemaNotFoundException { + schemaVersionInfoMap.clear(); + schemaMetadataInfoMap.clear(); + + client = mock(SchemaRegistryClient.class); + doAnswer(new Answer() { + @Override + public SchemaVersionInfo answer(final InvocationOnMock invocation) throws Throwable { + final String schemaName = invocation.getArgumentAt(0, String.class); + final SchemaVersionInfo info = schemaVersionInfoMap.get(schemaName); + + if (info == null) { + throw new SchemaNotFoundException(); + } + + return info; + } + }).when(client).getLatestSchemaVersionInfo(any(String.class)); + + doAnswer(new Answer() { + @Override + public SchemaMetadataInfo answer(InvocationOnMock invocation) throws Throwable { + final String schemaName = invocation.getArgumentAt(0, String.class); + final SchemaMetadataInfo info = schemaMetadataInfoMap.get(schemaName); + + if (info == null) { + throw new SchemaNotFoundException(); + } + + return info; + } + }).when(client).getSchemaMetadataInfo(any(String.class)); + + registry = new HortonworksSchemaRegistry() { + @Override + protected synchronized SchemaRegistryClient getClient() { + return client; + } + }; + } + + @Test + public void testCacheUsed() throws Exception { + final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc"))); + final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description"); + schemaVersionInfoMap.put("unit-test", info); + + final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test") + .compatibility(SchemaCompatibility.NONE) + .evolve(true) + .schemaGroup("group") + .type("AVRO") + .build(); + + final Constructor ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class); + ctr.setAccessible(true); + + final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis()); + + schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo); + + final Map properties = new HashMap<>(); + properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444"); + properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "5 mins"); + properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000"); + + final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null); + registry.enable(configurationContext); + + for (int i = 0; i < 10000; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class)); + } + + @Test + @Ignore("This can be useful for manual testing/debugging, but will keep ignored for now because we don't want automated builds to run this, since it depends on timing") + public void testCacheExpires() throws Exception { + final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc"))); + final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description"); + schemaVersionInfoMap.put("unit-test", info); + + final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test") + .compatibility(SchemaCompatibility.NONE) + .evolve(true) + .schemaGroup("group") + .type("AVRO") + .build(); + + final Constructor ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class); + ctr.setAccessible(true); + + final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis()); + + schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo); + + final Map properties = new HashMap<>(); + properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444"); + properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "1 sec"); + properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000"); + + final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null); + registry.enable(configurationContext); + + for (int i = 0; i < 2; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class)); + + Thread.sleep(2000L); + + for (int i = 0; i < 2; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(2)).getLatestSchemaVersionInfo(any(String.class)); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc new file mode 100644 index 0000000000..67098d0aff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc @@ -0,0 +1,6 @@ +{ + "name": "unitTest", + "namespace": "org.apache.nifi", + "type": "record", + "fields": [] +} \ No newline at end of file