mirror of https://github.com/apache/nifi.git
NIFI-4003: Expose configuration option for cache size and duration NIFI-4003: Addressed remaining spots where client does not cache information
This closes #1879. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
2595d816c4
commit
067e9dfeb0
|
@ -196,5 +196,25 @@ limitations under the License.
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes combine.children="append">
|
||||||
|
<exclude>src/test/resources/empty-schema.avsc</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -16,9 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.schemaregistry.hortonworks;
|
package org.apache.nifi.schemaregistry.hortonworks;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -64,15 +62,18 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
|
||||||
public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
|
public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
|
||||||
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
|
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
|
||||||
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
||||||
|
|
||||||
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
|
||||||
private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static final String LOGICAL_TYPE_DATE = "date";
|
private static final String LOGICAL_TYPE_DATE = "date";
|
||||||
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
|
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
|
||||||
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
|
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
|
||||||
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
|
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
|
||||||
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
|
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
|
||||||
private static final long VERSION_INFO_CACHE_NANOS = TimeUnit.MINUTES.toNanos(1L);
|
|
||||||
|
private volatile long versionInfoCacheNanos;
|
||||||
|
|
||||||
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
|
||||||
.name("url")
|
.name("url")
|
||||||
|
@ -83,33 +84,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("cache-size")
|
||||||
|
.displayName("Cache Size")
|
||||||
|
.description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
|
||||||
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
|
.defaultValue("1000")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
|
||||||
|
.name("cache-expiration")
|
||||||
|
.displayName("Cache Expiration")
|
||||||
|
.description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
|
||||||
|
+ "cached version of a schema will no longer be used, and the service will have to communicate with the "
|
||||||
|
+ "Hortonworks Schema Registry again in order to obtain the schema.")
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.defaultValue("1 hour")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
private static final List<PropertyDescriptor> propertyDescriptors = Collections.singletonList(URL);
|
|
||||||
private volatile SchemaRegistryClient schemaRegistryClient;
|
private volatile SchemaRegistryClient schemaRegistryClient;
|
||||||
private volatile boolean initialized;
|
private volatile boolean initialized;
|
||||||
private volatile Map<String, Object> schemaRegistryConfig;
|
private volatile Map<String, Object> schemaRegistryConfig;
|
||||||
|
|
||||||
public HortonworksSchemaRegistry() {
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void enable(final ConfigurationContext context) throws InitializationException {
|
public void enable(final ConfigurationContext context) throws InitializationException {
|
||||||
schemaRegistryConfig = new HashMap<>();
|
schemaRegistryConfig = new HashMap<>();
|
||||||
|
|
||||||
|
versionInfoCacheNanos = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
// The below properties may or may not need to be exposed to the end
|
// The below properties may or may not need to be exposed to the end
|
||||||
// user. We just need to watch usage patterns to see if sensible default
|
// user. We just need to watch usage patterns to see if sensible default
|
||||||
// can satisfy NiFi requirements
|
// can satisfy NiFi requirements
|
||||||
String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue();
|
String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue();
|
||||||
if (urlValue == null || urlValue.trim().length() == 0){
|
if (urlValue == null || urlValue.trim().isEmpty()) {
|
||||||
throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty.");
|
throw new IllegalArgumentException("'Schema Registry URL' must not be null or empty.");
|
||||||
}
|
}
|
||||||
|
|
||||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue);
|
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue);
|
||||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L);
|
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L);
|
||||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L);
|
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
|
||||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L);
|
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), context.getProperty(CACHE_SIZE).asInteger());
|
||||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L);
|
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -126,11 +144,15 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return propertyDescriptors;
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(URL);
|
||||||
|
properties.add(CACHE_SIZE);
|
||||||
|
properties.add(CACHE_EXPIRATION);
|
||||||
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private synchronized SchemaRegistryClient getClient() {
|
protected synchronized SchemaRegistryClient getClient() {
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
|
schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
@ -142,14 +164,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||||
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
||||||
try {
|
try {
|
||||||
// Try to fetch the SchemaVersionInfo from the cache.
|
// Try to fetch the SchemaVersionInfo from the cache.
|
||||||
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionCache.get(schemaName);
|
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(schemaName);
|
||||||
|
|
||||||
// Determine if the timestampedVersionInfo is expired
|
// Determine if the timestampedVersionInfo is expired
|
||||||
boolean fetch = false;
|
boolean fetch = false;
|
||||||
if (timestampedVersionInfo == null) {
|
if (timestampedVersionInfo == null) {
|
||||||
fetch = true;
|
fetch = true;
|
||||||
} else {
|
} else {
|
||||||
final long minTimestamp = System.nanoTime() - VERSION_INFO_CACHE_NANOS;
|
final long minTimestamp = System.nanoTime() - versionInfoCacheNanos;
|
||||||
fetch = timestampedVersionInfo.getValue() < minTimestamp;
|
fetch = timestampedVersionInfo.getValue() < minTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +188,41 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||||
|
|
||||||
// Store new version in cache.
|
// Store new version in cache.
|
||||||
final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
|
final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
|
||||||
schemaVersionCache.put(schemaName, tuple);
|
schemaVersionByNameCache.put(schemaName, tuple);
|
||||||
|
return versionInfo;
|
||||||
|
} catch (final SchemaNotFoundException e) {
|
||||||
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private SchemaVersionInfo getSchemaVersionInfo(final SchemaRegistryClient client, final SchemaVersionKey key) throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
||||||
|
try {
|
||||||
|
// Try to fetch the SchemaVersionInfo from the cache.
|
||||||
|
final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByKeyCache.get(key);
|
||||||
|
|
||||||
|
// Determine if the timestampedVersionInfo is expired
|
||||||
|
boolean fetch = false;
|
||||||
|
if (timestampedVersionInfo == null) {
|
||||||
|
fetch = true;
|
||||||
|
} else {
|
||||||
|
final long minTimestamp = System.nanoTime() - versionInfoCacheNanos;
|
||||||
|
fetch = timestampedVersionInfo.getValue() < minTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not expired, use what we got from the cache
|
||||||
|
if (!fetch) {
|
||||||
|
return timestampedVersionInfo.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
// schema version info was expired or not found in cache. Fetch from schema registry
|
||||||
|
final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(key);
|
||||||
|
if (versionInfo == null) {
|
||||||
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + key.getSchemaName() + "' and version " + key.getVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store new version in cache.
|
||||||
|
final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
|
||||||
|
schemaVersionByKeyCache.put(key, tuple);
|
||||||
return versionInfo;
|
return versionInfo;
|
||||||
} catch (final SchemaNotFoundException e) {
|
} catch (final SchemaNotFoundException e) {
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
|
||||||
|
@ -211,58 +267,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
|
public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
||||||
try {
|
final SchemaRegistryClient client = getClient();
|
||||||
final SchemaRegistryClient client = getClient();
|
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
|
||||||
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
|
if (info == null) {
|
||||||
if (info == null) {
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
final SchemaMetadata metadata = info.getSchemaMetadata();
|
|
||||||
final String schemaName = metadata.getName();
|
|
||||||
|
|
||||||
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
|
|
||||||
final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey);
|
|
||||||
if (versionInfo == null) {
|
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
return versionInfo.getSchemaText();
|
|
||||||
} catch (final SchemaNotFoundException e) {
|
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final SchemaMetadata metadata = info.getSchemaMetadata();
|
||||||
|
final String schemaName = metadata.getName();
|
||||||
|
|
||||||
|
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
|
||||||
|
final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
|
||||||
|
if (versionInfo == null) {
|
||||||
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
return versionInfo.getSchemaText();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
|
public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
||||||
try {
|
final SchemaRegistryClient client = getClient();
|
||||||
final SchemaRegistryClient client = getClient();
|
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
|
||||||
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
|
if (info == null) {
|
||||||
if (info == null) {
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
final SchemaMetadata metadata = info.getSchemaMetadata();
|
|
||||||
final String schemaName = metadata.getName();
|
|
||||||
|
|
||||||
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
|
|
||||||
final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey);
|
|
||||||
if (versionInfo == null) {
|
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
final String schemaText = versionInfo.getSchemaText();
|
|
||||||
|
|
||||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version);
|
|
||||||
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
|
|
||||||
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
|
|
||||||
final Schema schema = new Schema.Parser().parse(schemaText);
|
|
||||||
return createRecordSchema(schema, schemaText, schemaIdentifier);
|
|
||||||
});
|
|
||||||
} catch (final SchemaNotFoundException e) {
|
|
||||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final SchemaMetadata metadata = info.getSchemaMetadata();
|
||||||
|
final String schemaName = metadata.getName();
|
||||||
|
|
||||||
|
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
|
||||||
|
final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
|
||||||
|
if (versionInfo == null) {
|
||||||
|
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
final String schemaText = versionInfo.getSchemaText();
|
||||||
|
|
||||||
|
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version);
|
||||||
|
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
|
||||||
|
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
|
||||||
|
final Schema schema = new Schema.Parser().parse(schemaText);
|
||||||
|
return createRecordSchema(schema, schemaText, schemaIdentifier);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.schemaregistry.hortonworks;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.util.MockConfigurationContext;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
|
||||||
|
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
|
||||||
|
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
|
||||||
|
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
|
||||||
|
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
|
||||||
|
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
|
||||||
|
|
||||||
|
public class TestHortonworksSchemaRegistry {
|
||||||
|
private HortonworksSchemaRegistry registry;
|
||||||
|
private SchemaRegistryClient client;
|
||||||
|
|
||||||
|
private final Map<String, SchemaVersionInfo> schemaVersionInfoMap = new HashMap<>();
|
||||||
|
private final Map<String, SchemaMetadataInfo> schemaMetadataInfoMap = new HashMap<>();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws SchemaNotFoundException {
|
||||||
|
schemaVersionInfoMap.clear();
|
||||||
|
schemaMetadataInfoMap.clear();
|
||||||
|
|
||||||
|
client = mock(SchemaRegistryClient.class);
|
||||||
|
doAnswer(new Answer<SchemaVersionInfo>() {
|
||||||
|
@Override
|
||||||
|
public SchemaVersionInfo answer(final InvocationOnMock invocation) throws Throwable {
|
||||||
|
final String schemaName = invocation.getArgumentAt(0, String.class);
|
||||||
|
final SchemaVersionInfo info = schemaVersionInfoMap.get(schemaName);
|
||||||
|
|
||||||
|
if (info == null) {
|
||||||
|
throw new SchemaNotFoundException();
|
||||||
|
}
|
||||||
|
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
}).when(client).getLatestSchemaVersionInfo(any(String.class));
|
||||||
|
|
||||||
|
doAnswer(new Answer<SchemaMetadataInfo>() {
|
||||||
|
@Override
|
||||||
|
public SchemaMetadataInfo answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
final String schemaName = invocation.getArgumentAt(0, String.class);
|
||||||
|
final SchemaMetadataInfo info = schemaMetadataInfoMap.get(schemaName);
|
||||||
|
|
||||||
|
if (info == null) {
|
||||||
|
throw new SchemaNotFoundException();
|
||||||
|
}
|
||||||
|
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
}).when(client).getSchemaMetadataInfo(any(String.class));
|
||||||
|
|
||||||
|
registry = new HortonworksSchemaRegistry() {
|
||||||
|
@Override
|
||||||
|
protected synchronized SchemaRegistryClient getClient() {
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheUsed() throws Exception {
|
||||||
|
final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc")));
|
||||||
|
final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description");
|
||||||
|
schemaVersionInfoMap.put("unit-test", info);
|
||||||
|
|
||||||
|
final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test")
|
||||||
|
.compatibility(SchemaCompatibility.NONE)
|
||||||
|
.evolve(true)
|
||||||
|
.schemaGroup("group")
|
||||||
|
.type("AVRO")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class);
|
||||||
|
ctr.setAccessible(true);
|
||||||
|
|
||||||
|
final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis());
|
||||||
|
|
||||||
|
schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo);
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444");
|
||||||
|
properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "5 mins");
|
||||||
|
properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000");
|
||||||
|
|
||||||
|
final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
|
||||||
|
registry.enable(configurationContext);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
final RecordSchema schema = registry.retrieveSchema("unit-test");
|
||||||
|
assertNotNull(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore("This can be useful for manual testing/debugging, but will keep ignored for now because we don't want automated builds to run this, since it depends on timing")
|
||||||
|
public void testCacheExpires() throws Exception {
|
||||||
|
final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc")));
|
||||||
|
final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description");
|
||||||
|
schemaVersionInfoMap.put("unit-test", info);
|
||||||
|
|
||||||
|
final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test")
|
||||||
|
.compatibility(SchemaCompatibility.NONE)
|
||||||
|
.evolve(true)
|
||||||
|
.schemaGroup("group")
|
||||||
|
.type("AVRO")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class);
|
||||||
|
ctr.setAccessible(true);
|
||||||
|
|
||||||
|
final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis());
|
||||||
|
|
||||||
|
schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo);
|
||||||
|
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||||
|
properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444");
|
||||||
|
properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "1 sec");
|
||||||
|
properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000");
|
||||||
|
|
||||||
|
final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
|
||||||
|
registry.enable(configurationContext);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
final RecordSchema schema = registry.retrieveSchema("unit-test");
|
||||||
|
assertNotNull(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class));
|
||||||
|
|
||||||
|
Thread.sleep(2000L);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
final RecordSchema schema = registry.retrieveSchema("unit-test");
|
||||||
|
assertNotNull(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
Mockito.verify(client, Mockito.times(2)).getLatestSchemaVersionInfo(any(String.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"name": "unitTest",
|
||||||
|
"namespace": "org.apache.nifi",
|
||||||
|
"type": "record",
|
||||||
|
"fields": []
|
||||||
|
}
|
Loading…
Reference in New Issue