NIFI-4459: This closes #2240. Catch Exception if trying to iterate over many confluent schemas and unable to load one; in this case log a WARNING and continue on; also updated Jersey client to newest

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-10-30 11:59:55 -04:00 committed by joewitt
parent 73702004b9
commit 412b3fbbe2
3 changed files with 29 additions and 10 deletions

View File

@ -9,7 +9,8 @@
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@ -48,11 +49,16 @@
<artifactId>jersey-client</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-utils</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -137,7 +137,7 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
}
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext);
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, getLogger());
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();

View File

@ -20,12 +20,14 @@ package org.apache.nifi.confluent.schemaregistry.client;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.web.util.WebUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ArrayNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
@ -53,8 +55,10 @@ import java.util.concurrent.ConcurrentMap;
* </p>
*/
public class RestSchemaRegistryClient implements SchemaRegistryClient {
private final List<String> baseUrls;
private final Client client;
private final ComponentLog logger;
private static final String SUBJECT_FIELD_NAME = "subject";
private static final String VERSION_FIELD_NAME = "version";
@ -65,13 +69,15 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap = new ConcurrentHashMap<>();
public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext) {
public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) {
this.baseUrls = new ArrayList<>(baseUrls);
final ClientConfig clientConfig = new ClientConfig();
clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis);
clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis);
client = WebUtils.createClient(clientConfig, sslContext);
this.logger = logger;
}
@ -107,10 +113,17 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
for (final JsonNode node : arrayNode) {
final String nodeName = node.getTextValue();
final String nodeName = node.asText();
final String schemaPath = getSubjectPath(nodeName);
final JsonNode schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
final JsonNode schemaNode;
try {
schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
} catch (final SchemaNotFoundException | IOException e) {
logger.warn("Failed to fetch Schema with name '{}' from Confluent Schema Registry; "
+ "will skip this schema and continue attempting to retrieve other schemas", new Object[] {nodeName, e});
continue;
}
final int id = schemaNode.get(ID_FIELD_NAME).asInt();
schemaNameToIdentifierMap.put(nodeName, id);
@ -125,10 +138,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
}
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
final String subject = schemaNode.get(SUBJECT_FIELD_NAME).getTextValue();
final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
final int id = schemaNode.get(ID_FIELD_NAME).asInt();
final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).getTextValue();
final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).asText();
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);